http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java new file mode 100644 index 0000000..5f05b24 --- /dev/null +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/ParserRunnerImplTest.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.metadata.RawMessage; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.utils.ReflectionUtils; +import org.apache.metron.parsers.filters.Filters; +import org.apache.metron.parsers.filters.StellarFilter; +import org.apache.metron.parsers.interfaces.MessageFilter; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.topology.ParserComponent; +import org.apache.metron.parsers.ParserRunnerImpl.ProcessResult; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ParserRunnerImpl.class, ReflectionUtils.class, Filters.class}) +public class ParserRunnerImplTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + /** + { + "fieldValidations" : [ + { + "input" : [ "ip_src_addr", "ip_dst_addr"], + "validation" : "IP" + } + ] + } + */ + @Multiline + private String globalConfigString; + + /** + { + "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser", + "filterClassName":"org.apache.metron.parsers.filters.StellarFilter", + "sensorTopic":"bro", + "parserConfig": { + "field": "value" + }, + "fieldTransformations" : [ + { + "input" : "field1", + "transformation" : "REMOVE" + } + ] + } + */ + @Multiline + private String broConfigString; + + /** + { + "parserClassName":"org.apache.metron.parsers.snort.BasicSnortParser", + "sensorTopic":"snort", + "parserConfig": {} + } + */ + @Multiline + private String snortConfigString; + + private ParserConfigurations parserConfigurations; + private MessageParser<JSONObject> broParser; + private MessageParser<JSONObject> snortParser; + private MessageFilter<JSONObject> stellarFilter; + private ParserRunnerImpl parserRunner; + + + @Before + public void setup() throws IOException { + parserConfigurations = new ParserConfigurations(); + SensorParserConfig broConfig = SensorParserConfig.fromBytes(broConfigString.getBytes()); + SensorParserConfig snortConfig = SensorParserConfig.fromBytes(snortConfigString.getBytes()); + parserConfigurations.updateSensorParserConfig("bro", broConfig); + parserConfigurations.updateSensorParserConfig("snort", snortConfig); + parserConfigurations.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfigString, JSONUtils.MAP_SUPPLIER)); + parserRunner = new ParserRunnerImpl(new HashSet<>(Arrays.asList("bro", "snort"))); + broParser = mock(MessageParser.class); + snortParser = mock(MessageParser.class); + stellarFilter = mock(StellarFilter.class); + mockStatic(ReflectionUtils.class); + mockStatic(Filters.class); + + when(ReflectionUtils.createInstance("org.apache.metron.parsers.bro.BasicBroParser")).thenReturn(broParser); + when(ReflectionUtils.createInstance("org.apache.metron.parsers.snort.BasicSnortParser")).thenReturn(snortParser); + when(Filters.get("org.apache.metron.parsers.filters.StellarFilter", broConfig.getParserConfig())) + .thenReturn(stellarFilter); + + } + + @Test + public void shouldThrowExceptionOnEmptyParserSupplier() { + exception.expect(IllegalStateException.class); + exception.expectMessage("A parser config supplier must be set before initializing the ParserRunner."); + + parserRunner.init(null, null); + } + + @Test + public void shouldThrowExceptionOnEmptyStellarContext() { + exception.expect(IllegalStateException.class); + exception.expectMessage("A stellar context must be set before initializing the ParserRunner."); + + parserRunner.init(() -> parserConfigurations, null); + } + + @Test + public void initShouldThrowExceptionOnMissingSensorParserConfig() { + exception.expect(IllegalStateException.class); + exception.expectMessage("Could not initialize parsers. Cannot find configuration for sensor test."); + + parserRunner = new ParserRunnerImpl(new HashSet<String>() {{ + add("test"); + }}); + + parserRunner.init(() -> parserConfigurations, mock(Context.class)); + } + + @Test + public void executeShouldThrowExceptionOnMissingSensorParserConfig() { + exception.expect(IllegalStateException.class); + exception.expectMessage("Could not execute parser. Cannot find configuration for sensor test."); + + parserRunner = new ParserRunnerImpl(new HashSet<String>() {{ + add("test"); + }}); + + parserRunner.execute("test", mock(RawMessage.class), parserConfigurations); + } + + @Test + public void shouldInit() throws Exception { + Context stellarContext = mock(Context.class); + Map<String, Object> broParserConfig = parserConfigurations.getSensorParserConfig("bro").getParserConfig(); + Map<String, Object> snortParserConfig = parserConfigurations.getSensorParserConfig("snort").getParserConfig(); + + parserRunner.init(() -> parserConfigurations, stellarContext); + + { + // Verify Stellar context + Assert.assertEquals(stellarContext, parserRunner.getStellarContext()); + } + + Map<String, ParserComponent> sensorToParserComponentMap = parserRunner.getSensorToParserComponentMap(); + { + // Verify Bro parser initialization + Assert.assertEquals(2, sensorToParserComponentMap.size()); + ParserComponent broComponent = sensorToParserComponentMap.get("bro"); + Assert.assertEquals(broParser, broComponent.getMessageParser()); + Assert.assertEquals(stellarFilter, broComponent.getFilter()); + verify(broParser, times(1)).init(); + verify(broParser, times(1)).configure(broParserConfig); + verifyNoMoreInteractions(broParser); + verifyNoMoreInteractions(stellarFilter); + } + { + // Verify Snort parser initialization + ParserComponent snortComponent = sensorToParserComponentMap.get("snort"); + Assert.assertEquals(snortParser, snortComponent.getMessageParser()); + Assert.assertNull(snortComponent.getFilter()); + verify(snortParser, times(1)).init(); + verify(snortParser, times(1)).configure(snortParserConfig); + verifyNoMoreInteractions(snortParser); + } + } + + @Test + public void shouldExecute() { + parserRunner = spy(parserRunner); + RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>()); + JSONObject parsedMessage1 = new JSONObject(); + parsedMessage1.put("field", "parsedMessage1"); + JSONObject parsedMessage2 = new JSONObject(); + parsedMessage2.put("field", "parsedMessage2"); + Object rawMessage1 = new RawMessage("raw_message1".getBytes(), new HashMap<>()); + Object rawMessage2 = new RawMessage("raw_message2".getBytes(), new HashMap<>()); + Throwable throwable1 = mock(Throwable.class); + Throwable throwable2 = mock(Throwable.class); + MessageParserResult<JSONObject> messageParserResult = new DefaultMessageParserResult<>(Arrays.asList(parsedMessage1, parsedMessage2), + new HashMap<Object, Throwable>(){{ + put(rawMessage1, throwable1); + put(rawMessage2, throwable2); + }}); + JSONObject processedMessage = new JSONObject(); + processedMessage.put("field", "processedMessage1"); + MetronError processedError = new MetronError().withMessage("processedError"); + ProcessResult processedMessageResult = mock(ProcessResult.class); + ProcessResult processedErrorResult = mock(ProcessResult.class); + + when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult)); + when(processedMessageResult.getMessage()).thenReturn(processedMessage); + when(processedErrorResult.isError()).thenReturn(true); + when(processedErrorResult.getError()).thenReturn(processedError); + doReturn(Optional.of(processedMessageResult)).when(parserRunner) + .processMessage("bro", parsedMessage1, rawMessage, broParser, parserConfigurations); + doReturn(Optional.of(processedErrorResult)).when(parserRunner) + .processMessage("bro", parsedMessage2, rawMessage, broParser, parserConfigurations); + + MetronError expectedParseError1 = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(throwable1) + .withSensorType(Collections.singleton("bro")) + .addRawMessage(rawMessage1); + MetronError expectedParseError2 = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(throwable2) + .withSensorType(Collections.singleton("bro")) + .addRawMessage(rawMessage2); + + parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{ + put("bro", new ParserComponent(broParser, stellarFilter)); + }}); + ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute("bro", rawMessage, parserConfigurations); + + Assert.assertEquals(1, parserRunnerResults.getMessages().size()); + Assert.assertTrue(parserRunnerResults.getMessages().contains(processedMessage)); + Assert.assertEquals(3, parserRunnerResults.getErrors().size()); + Assert.assertTrue(parserRunnerResults.getErrors().contains(processedError)); + Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError1)); + Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedParseError2)); + } + + @Test + public void shouldExecuteWithMasterThrowable() { + parserRunner = spy(parserRunner); + RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>()); + Throwable masterThrowable = mock(Throwable.class); + MessageParserResult<JSONObject> messageParserResult = new DefaultMessageParserResult<>(masterThrowable); + + + when(broParser.parseOptionalResult(rawMessage.getMessage())).thenReturn(Optional.of(messageParserResult)); + + parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{ + put("bro", new ParserComponent(broParser, stellarFilter)); + }}); + ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute("bro", rawMessage, parserConfigurations); + + verify(parserRunner, times(0)) + .processMessage(any(), any(), any(), any(), any()); + + MetronError expectedError = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(masterThrowable) + .withSensorType(Collections.singleton("bro")) + .addRawMessage(rawMessage.getMessage()); + Assert.assertEquals(1, parserRunnerResults.getErrors().size()); + Assert.assertTrue(parserRunnerResults.getErrors().contains(expectedError)); + } + + @Test + public void shouldPopulateMessagesOnProcessMessage() { + JSONObject inputMessage = new JSONObject(); + inputMessage.put("guid", "guid"); + inputMessage.put("ip_src_addr", "192.168.1.1"); + inputMessage.put("ip_dst_addr", "192.168.1.2"); + inputMessage.put("field1", "value"); + RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>()); + + JSONObject expectedOutput = new JSONObject(); + expectedOutput.put("guid", "guid"); + expectedOutput.put("source.type", "bro"); + expectedOutput.put("ip_src_addr", "192.168.1.1"); + expectedOutput.put("ip_dst_addr", "192.168.1.2"); + + when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true); + when(broParser.validate(expectedOutput)).thenReturn(true); + + parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{ + put("bro", new ParserComponent(broParser, stellarFilter)); + }}); + + Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations); + + Assert.assertTrue(processResult.isPresent()); + Assert.assertFalse(processResult.get().isError()); + Assert.assertEquals(expectedOutput, processResult.get().getMessage()); + } + + @Test + public void shouldReturnMetronErrorOnInvalidMessage() { + JSONObject inputMessage = new JSONObject(); + inputMessage.put("guid", "guid"); + RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>()); + + JSONObject expectedOutput = new JSONObject(); + expectedOutput.put("guid", "guid"); + expectedOutput.put("source.type", "bro"); + MetronError expectedMetronError = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(Collections.singleton("bro")) + .addRawMessage(inputMessage); + + when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true); + when(broParser.validate(expectedOutput)).thenReturn(false); + + parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{ + put("bro", new ParserComponent(broParser, stellarFilter)); + }}); + + Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations); + + Assert.assertTrue(processResult.isPresent()); + Assert.assertTrue(processResult.get().isError()); + Assert.assertEquals(expectedMetronError, processResult.get().getError()); + } + + @Test + public void shouldReturnMetronErrorOnFailedFieldValidator() { + JSONObject inputMessage = new JSONObject(); + inputMessage.put("guid", "guid"); + inputMessage.put("ip_src_addr", "test"); + inputMessage.put("ip_dst_addr", "test"); + RawMessage rawMessage = new RawMessage("raw_message".getBytes(), new HashMap<>()); + + JSONObject expectedOutput = new JSONObject(); + expectedOutput.put("guid", "guid"); + expectedOutput.put("ip_src_addr", "test"); + expectedOutput.put("ip_dst_addr", "test"); + expectedOutput.put("source.type", "bro"); + MetronError expectedMetronError = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(Collections.singleton("bro")) + .addRawMessage(inputMessage) + .withErrorFields(new HashSet<>(Arrays.asList("ip_src_addr", "ip_dst_addr"))); + + when(stellarFilter.emit(expectedOutput, parserRunner.getStellarContext())).thenReturn(true); + when(broParser.validate(expectedOutput)).thenReturn(true); + + parserRunner.setSensorToParserComponentMap(new HashMap<String, ParserComponent>() {{ + put("bro", new ParserComponent(broParser, stellarFilter)); + }}); + + Optional<ParserRunnerImpl.ProcessResult> processResult = parserRunner.processMessage("bro", inputMessage, rawMessage, broParser, parserConfigurations); + + Assert.assertTrue(processResult.isPresent()); + Assert.assertTrue(processResult.get().isError()); + Assert.assertEquals(expectedMetronError, processResult.get().getError()); + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index e5e7180..9f58d1c 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -17,719 +17,428 @@ */ package org.apache.metron.parsers.bolt; -import com.google.common.collect.ImmutableList; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.curator.framework.CuratorFramework; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; -import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; -import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; -import org.apache.metron.common.writer.BulkMessageWriter; -import org.apache.metron.common.writer.BulkWriterResponse; -import org.apache.metron.common.writer.MessageWriter; -import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.parsers.DefaultMessageParserResult; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParserResult; -import org.apache.metron.parsers.interfaces.MultilineMessageParser; -import org.apache.metron.parsers.topology.ParserComponents; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.metadata.RawMessage; +import org.apache.metron.parsers.DefaultParserRunnerResults; +import org.apache.metron.parsers.ParserRunnerResults; +import org.apache.metron.parsers.ParserRunnerImpl; import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration; import org.apache.metron.test.bolt.BaseBoltTest; import org.apache.metron.test.error.MetronErrorJSONMatcher; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; +import org.apache.storm.Config; import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.Mock; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.function.Supplier; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ParserBoltTest extends BaseBoltTest { - @Mock - private MultilineMessageParser<JSONObject> parser; - - @Mock - private MessageWriter<JSONObject> writer; - - @Mock - private BulkMessageWriter<JSONObject> batchWriter; + @Rule + public final ExpectedException exception = ExpectedException.none(); @Mock - private MessageFilter<JSONObject> filter; + private Tuple t1; @Mock - private Tuple t1; + private ParserRunnerImpl parserRunner; @Mock - private Tuple t2; + private WriterHandler writerHandler; @Mock - private Tuple t3; + private WriterHandler writerHandlerHandleAck; @Mock - private Tuple t4; + private MessageGetStrategy messageGetStrategy; @Mock - private Tuple t5; + private Context stellarContext; - private static class RecordingWriter implements BulkMessageWriter<JSONObject> { - List<JSONObject> records = new ArrayList<>(); + private class MockParserRunner extends ParserRunnerImpl { - @Override - public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception { + private boolean isInvalid = false; + private RawMessage rawMessage; + private JSONObject message; + public MockParserRunner(HashSet<String> sensorTypes) { + super(sensorTypes); } @Override - public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception { - records.addAll(messages); - BulkWriterResponse ret = new BulkWriterResponse(); - ret.addAllSuccesses(tuples); - return ret; + public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) { + DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults(); + this.rawMessage = rawMessage; + if (!isInvalid) { + parserRunnerResults.addMessage(message); + } else { + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(Collections.singleton(sensorType)) + .addRawMessage(message); + parserRunnerResults.addError(error); + } + return parserRunnerResults; } - @Override - public String getName() { - return "recording"; + protected void setInvalid(boolean isInvalid) { + this.isInvalid = isInvalid; } - @Override - public void close() throws Exception { - + protected void setMessage(JSONObject message) { + this.message = message; } - public List<JSONObject> getRecords() { - return records; + protected RawMessage getRawMessage() { + return rawMessage; } } - private static ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return createUpdater(Optional.empty()); - } - private static ConfigurationsUpdater<ParserConfigurations> createUpdater(Optional<Integer> batchSize) { - return new ConfigurationsUpdater<ParserConfigurations>(null, null) { - @Override - public void update(CuratorFramework client, String path, byte[] data) throws IOException { } + @Before + public void setup() { + when(writerHandler.handleAck()).thenReturn(false); + when(writerHandlerHandleAck.handleAck()).thenReturn(true); - @Override - public void delete(CuratorFramework client, String path, byte[] data) throws IOException { } - - @Override - public ConfigurationType getType() { - return ConfigurationType.PARSER; - } - - @Override - public void update(String name, byte[] data) throws IOException { } - - @Override - public void delete(String name) { } - - @Override - public Class<ParserConfigurations> getConfigurationClass() { - return ParserConfigurations.class; - } - - @Override - public void forceUpdate(CuratorFramework client) { } - - @Override - public ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - if(batchSize.isPresent()) { - put(IndexingConfigurations.BATCH_SIZE_CONF, batchSize.get()); - } - }}; - } - }; - } - }; - } - }; } - @Test - public void testEmpty() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - null, - new WriterHandler(writer) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); - } - }; - - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(writer, times(1)).init(); - byte[] sampleBinary = "some binary message".getBytes(); - - when(tuple.getBinary(0)).thenReturn(sampleBinary); - when(parser.parseOptionalResult(sampleBinary)).thenReturn(null); - parserBolt.execute(tuple); - verify(parser, times(0)).validate(any()); - verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any()); - verify(outputCollector, times(1)).ack(tuple); - - MetronError error = new MetronError() - .withErrorType(Constants.ErrorType.PARSER_ERROR) - .withThrowable(new NullPointerException()) - .withSensorType(Collections.singleton(sensorType)) - .addRawMessage(sampleBinary); - verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); + public void shouldThrowExceptionOnDifferentHandleAck() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("All writers must match when calling handleAck()"); + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + put("bro", writerHandlerHandleAck); + }}); } @Test - public void testInvalid() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - null, - new WriterHandler(writer) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); - } - }; + public void withBatchTimeoutDivisorShouldSetBatchTimeoutDivisor() { + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}).withBatchTimeoutDivisor(5); - buildGlobalConfig(parserBolt); + Assert.assertEquals(5, parserBolt.getBatchTimeoutDivisor()); + } - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - byte[] sampleBinary = "some binary message".getBytes(); - - when(tuple.getBinary(0)).thenReturn(sampleBinary); - JSONObject parsedMessage = new JSONObject(); - parsedMessage.put("field", "invalidValue"); - parsedMessage.put("guid", "this-is-unique-identifier-for-tuple"); - List<JSONObject> messageList = new ArrayList<>(); - messageList.add(parsedMessage); - when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messageList))); - when(parser.validate(parsedMessage)).thenReturn(true); - parserBolt.execute(tuple); + @Test + public void shouldThrowExceptionOnInvalidBatchTimeoutDivisor() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("batchTimeoutDivisor must be positive. Value provided was -1"); - MetronError error = new MetronError() - .withErrorType(Constants.ErrorType.PARSER_INVALID) - .withSensorType(Collections.singleton(sensorType)) - .withErrorFields(new HashSet<String>() {{ add("field"); }}) - .addRawMessage(new JSONObject(){{ - put("field", "invalidValue"); - put("source.type", "yaf"); - put("guid", "this-is-unique-identifier-for-tuple"); - }}); - verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}).withBatchTimeoutDivisor(-1); } @Test - public void test() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - null, - new WriterHandler(writer) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { + public void shouldGetComponentConfiguration() { + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { + @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); + public ParserConfigurations getConfigurations() { + ParserConfigurations configurations = new ParserConfigurations(); + SensorParserConfig sensorParserConfig = new SensorParserConfig(); + sensorParserConfig.setParserConfig(new HashMap<String, Object>() {{ + put(IndexingConfigurations.BATCH_SIZE_CONF, 10); + }}); + configurations.updateSensorParserConfig("yaf", sensorParserConfig); + return configurations; } }; - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(writer, times(1)).init(); - byte[] sampleBinary = "some binary message".getBytes(); - JSONParser jsonParser = new JSONParser(); - final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"guid\": \"this-is-unique-identifier-for-tuple\" }"); - final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"guid\": \"this-is-unique-identifier-for-tuple\" }"); - List<JSONObject> messages = new ArrayList<JSONObject>() {{ - add(sampleMessage1); - add(sampleMessage2); - }}; - final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }"); - final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }"); - when(tuple.getBinary(0)).thenReturn(sampleBinary); - when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messages))); - when(parser.validate(eq(messages.get(0)))).thenReturn(true); - when(parser.validate(eq(messages.get(1)))).thenReturn(false); - parserBolt.execute(tuple); - verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage1)); - verify(outputCollector, times(1)).ack(tuple); - when(parser.validate(eq(messages.get(0)))).thenReturn(true); - when(parser.validate(eq(messages.get(1)))).thenReturn(true); - when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false); - when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true); - parserBolt.execute(tuple); - verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2)); - verify(outputCollector, times(2)).ack(tuple); - doThrow(new Exception()).when(writer).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2)); - parserBolt.execute(tuple); - verify(outputCollector, times(1)).reportError(any(Throwable.class)); - } - /** - { - "filterClassName" : "STELLAR" - ,"parserConfig" : { - "filter.query" : "exists(field1)" - } - } - */ - @Multiline - public static String sensorParserConfig; - - /** - * Tests to ensure that a message that is unfiltered results in one write and an ack. - * @throws Exception - */ - @Test - public void testFilterSuccess() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - null, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig); - - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - BulkWriterResponse successResponse = mock(BulkWriterResponse.class); - when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1)); - when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{ - put("field1", "blah"); - }}))))); - parserBolt.execute(t1); - verify(batchWriter, times(1)).write(any(), any(), any(), any()); - verify(outputCollector, times(1)).ack(t1); + Map<String, Object> componentConfiguration = parserBolt.getComponentConfiguration(); + Assert.assertEquals(1, componentConfiguration.size()); + Assert.assertEquals( 14, componentConfiguration.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)); } - - /** - * Tests to ensure that a message filtered out results in no writes, but an ack. - * @throws Exception - */ @Test - public void testFilterFailure() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - null, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { + public void shouldPrepare() { + Map stormConf = mock(Map.class); + SensorParserConfig yafConfig = mock(SensorParserConfig.class); + when(yafConfig.getSensorTopic()).thenReturn("yafTopic"); + when(yafConfig.getParserConfig()).thenReturn(new HashMap<String, Object>() {{ + put(IndexingConfigurations.BATCH_SIZE_CONF, 10); + }}); + ParserConfigurations parserConfigurations = mock(ParserConfigurations.class); + + ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { + @Override protected SensorParserConfig getSensorParserConfig(String sensorType) { - try { - return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig)); - } catch (IOException e) { - throw new RuntimeException(e); + if ("yaf".equals(sensorType)) { + return yafConfig; } + return null; } @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); + public ParserConfigurations getConfigurations() { + return parserConfigurations; } - }; + }); + doReturn(stellarContext).when(parserBolt).initializeStellar(); parserBolt.setCuratorFramework(client); parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{ - put("field2", "blah"); - }}))))); - parserBolt.execute(t1); - verify(batchWriter, times(0)).write(any(), any(), any(), any()); - verify(outputCollector, times(1)).ack(t1); - } - /** - { - "sensorTopic":"dummy" - ,"parserConfig": { - "batchSize" : 1 - } - ,"fieldTransformations" : [ - { - "transformation" : "STELLAR" - ,"output" : "timestamp" - ,"config" : { - "timestamp" : "TO_EPOCH_TIMESTAMP(timestampstr, 'yyyy-MM-dd HH:mm:ss', 'UTC')" - } - } - ] - } - */ - @Multiline - public static String csvWithFieldTransformations; - @Test - public void testFieldTransformationPriorToValidation() { - String sensorType = "dummy"; - RecordingWriter recordingWriter = new RecordingWriter(); - //create a parser which acts like a basic parser but returns no timestamp field. - MultilineMessageParser<JSONObject> dummyParser = new MultilineMessageParser<JSONObject>() { - @Override - public void configure(Map<String, Object> config) { - } + parserBolt.prepare(stormConf, topologyContext, outputCollector); - @Override - public void init() { - } + verify(parserRunner, times(1)).init(any(Supplier.class), eq(stellarContext)); + verify(yafConfig, times(1)).init(); + Map<String, String> topicToSensorMap = parserBolt.getTopicToSensorMap(); + Assert.assertEquals(1, topicToSensorMap.size()); + Assert.assertEquals("yaf", topicToSensorMap.get("yafTopic")); + verify(writerHandler).init(stormConf, topologyContext, outputCollector, parserConfigurations); + verify(writerHandler).setDefaultBatchTimeout(14); + } - @Override - public boolean validate(JSONObject message) { - Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName()); - if (timestampObject instanceof Long) { - Long timestamp = (Long) timestampObject; - return timestamp > 0; - } - return false; - } + @Test + public void shouldThrowExceptionOnMissingConfig() { + exception.expect(IllegalStateException.class); + exception.expectMessage("Unable to retrieve a parser config for yaf"); - @Override - @SuppressWarnings("unchecked") - public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { - return Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject() {{ - put("data", "foo"); - put("timestampstr", "2016-01-05 17:02:30"); - put("original_string", "blah"); - }}))); - } - }; + Map stormConf = mock(Map.class); - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - dummyParser, - null, - new WriterHandler(recordingWriter) - ) - ); - ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations); + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}); parserBolt.setCuratorFramework(client); parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - when(t1.getBinary(0)).thenReturn(new byte[] {}); - parserBolt.execute(t1); - Assert.assertEquals(1, recordingWriter.getRecords().size()); - long expected = 1452013350000L; - Assert.assertEquals(expected, recordingWriter.getRecords().get(0).get("timestamp")); + + parserBolt.prepare(stormConf, topologyContext, outputCollector); } + @Test - public void testDefaultBatchSize() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - filter, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { + public void executeShouldHandleTickTuple() throws Exception { + when(t1.getSourceComponent()).thenReturn("__system"); + when(t1.getSourceStreamId()).thenReturn("__tick"); + ParserConfigurations parserConfigurations = mock(ParserConfigurations.class); + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { + @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - // this uses default batch size - return ParserBoltTest.createUpdater(); + public ParserConfigurations getConfigurations() { + return parserConfigurations; } }; - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); - when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - BulkWriterResponse response = new BulkWriterResponse(); - Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE]; - for (int i=0; i < uniqueTuples.length; i++) { - uniqueTuples[i] = mock(Tuple.class); - response.addSuccess(uniqueTuples[i]); - } - when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response); - for (Tuple tuple : uniqueTuples) { - parserBolt.execute(tuple); - } - for (Tuple uniqueTuple : uniqueTuples) { - verify(outputCollector, times(1)).ack(uniqueTuple); - } + parserBolt.setMessageGetStrategy(messageGetStrategy); + parserBolt.setOutputCollector(outputCollector); + + parserBolt.execute(t1); + + verify(writerHandler, times(1)).flush(parserConfigurations, messageGetStrategy); + verify(outputCollector, times(1)).ack(t1); } @Test - public void testLessRecordsThanDefaultBatchSize() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - filter, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { + public void shouldExecuteOnSuccess() throws Exception { + when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8)); + when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic"); + MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }}); + ParserConfigurations parserConfigurations = new ParserConfigurations(); + parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig()); + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { + @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - // this uses default batch size - return ParserBoltTest.createUpdater(); + public ParserConfigurations getConfigurations() { + return parserConfigurations; } }; - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); - when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1; - BulkWriterResponse response = new BulkWriterResponse(); - Tuple[] uniqueTuples = new Tuple[oneLessThanDefaultBatchSize]; - for (int i=0; i < uniqueTuples.length; i++) { - uniqueTuples[i] = mock(Tuple.class); - response.addSuccess(uniqueTuples[i]); + parserBolt.setMessageGetStrategy(messageGetStrategy); + parserBolt.setOutputCollector(outputCollector); + parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{ + put("yafTopic", "yaf"); + }}); + JSONObject message = new JSONObject(); + message.put("field", "value"); + mockParserRunner.setMessage(message); + RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>()); + + { + // Verify the correct message is written and ack is handled + parserBolt.execute(t1); + + Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage()); + verify(writerHandler, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy); + verify(outputCollector, times(1)).ack(t1); } - for (Tuple tuple : uniqueTuples) { - parserBolt.execute(tuple); + { + // Verify the tuple is not acked when the writer is set to handle ack + reset(outputCollector); + parserBolt.setSensorToWriterMap(new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandlerHandleAck); + }}); + + parserBolt.execute(t1); + + verify(writerHandlerHandleAck, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy); + verify(outputCollector, times(0)).ack(t1); } - // should have no acking yet - batch size not fulfilled - verify(outputCollector, never()).ack(any(Tuple.class)); - response.addSuccess(t1); // used to achieve count in final verify - Iterable<Tuple> tuples = new HashSet(Arrays.asList(uniqueTuples)) {{ - add(t1); - }}; - when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response); - // meet batch size requirement and now it should ack - parserBolt.execute(t1); - verify(outputCollector, times(ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE)).ack(any(Tuple.class)); } @Test - public void testBatchOfOne() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - filter, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { + public void shouldExecuteOnError() throws Exception { + when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8)); + when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic"); + MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ + add("yaf"); + }}); + mockParserRunner.setInvalid(true); + ParserConfigurations parserConfigurations = new ParserConfigurations(); + parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig()); + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { + @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(Optional.of(1)); + public ParserConfigurations getConfigurations() { + return parserConfigurations; } }; - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); - when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - BulkWriterResponse response = new BulkWriterResponse(); - response.addSuccess(t1); - when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response); + parserBolt.setMessageGetStrategy(messageGetStrategy); + parserBolt.setOutputCollector(outputCollector); + parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{ + put("yafTopic", "yaf"); + }}); + JSONObject message = new JSONObject(); + message.put("field", "value"); + mockParserRunner.setMessage(message); + RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>()); + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(Collections.singleton("yaf")) + .addRawMessage(message); + parserBolt.execute(t1); + + Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage()); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), + argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); verify(outputCollector, times(1)).ack(t1); + } @Test - public void testBatchOfFive() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - filter, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(Optional.of(5)); - } - } ; - - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); - when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - Set<Tuple> tuples = Stream.of(t1, t2, t3, t4, t5).collect(Collectors.toSet()); - BulkWriterResponse response = new BulkWriterResponse(); - response.addAllSuccesses(tuples); - when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response); - writeNonBatch(outputCollector, parserBolt, t1); - writeNonBatch(outputCollector, parserBolt, t2); - writeNonBatch(outputCollector, parserBolt, t3); - writeNonBatch(outputCollector, parserBolt, t4); - parserBolt.execute(t5); - verify(batchWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any()); - verify(outputCollector, times(1)).ack(t1); - verify(outputCollector, times(1)).ack(t2); - verify(outputCollector, times(1)).ack(t3); - verify(outputCollector, times(1)).ack(t4); - verify(outputCollector, times(1)).ack(t5); + public void shouldThrowExceptionOnFailedExecute() { + when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8)); + when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic"); + ParserConfigurations parserConfigurations = new ParserConfigurations(); + parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig()); + doThrow(new IllegalStateException("parserRunner.execute failed")).when(parserRunner).execute(eq("yaf"), any(), eq(parserConfigurations)); - } + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { - @Test - public void testBatchOfFiveWithError() throws Exception { - String sensorType = "yaf"; - Map<String, ParserComponents> parserMap = Collections.singletonMap( - sensorType, - new ParserComponents( - parser, - filter, - new WriterHandler(batchWriter) - ) - ); - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) { @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(Optional.of(5)); + public ParserConfigurations getConfigurations() { + return parserConfigurations; } }; - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - - doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); - when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - parserBolt.execute(t1); - parserBolt.execute(t2); - parserBolt.execute(t3); - parserBolt.execute(t4); - parserBolt.execute(t5); - verify(batchWriter, times(1)).write(any(), any(), any(), any()); - verify(outputCollector, times(1)).ack(t1); - verify(outputCollector, times(1)).ack(t2); - verify(outputCollector, times(1)).ack(t3); - verify(outputCollector, times(1)).ack(t4); - verify(outputCollector, times(1)).ack(t5); + parserBolt.setMessageGetStrategy(messageGetStrategy); + parserBolt.setOutputCollector(outputCollector); + parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{ + put("yafTopic", "yaf"); + }}); + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(new IllegalStateException("parserRunner.execute failed")) + .withSensorType(Collections.singleton("yaf")) + .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8)); - } + parserBolt.execute(t1); - protected void buildGlobalConfig(ParserBolt parserBolt) { - HashMap<String, Object> globalConfig = new HashMap<>(); - Map<String, Object> fieldValidation = new HashMap<>(); - fieldValidation.put("input", Arrays.asList("field")); - fieldValidation.put("validation", "STELLAR"); - fieldValidation.put("config", new HashMap<String, String>(){{ put("condition", "field != 'invalidValue'"); }}); - globalConfig.put("fieldValidations", Arrays.asList(fieldValidation)); - parserBolt.getConfigurations().updateGlobalConfig(globalConfig); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), + argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); + verify(outputCollector, times(1)).reportError(any(IllegalStateException.class)); + verify(outputCollector, times(1)).ack(t1); } - private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap, - String csvWithFieldTransformations) { - return new ParserBolt("zookeeperUrl", parserMap) { - @Override - protected SensorParserConfig getSensorParserConfig(String sensorType) { - try { - return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + @Test + public void shouldThrowExceptionOnFailedWrite() throws Exception { + when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8)); + when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic"); + MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }}); + ParserConfigurations parserConfigurations = new ParserConfigurations(); + parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig()); + doThrow(new IllegalStateException("write failed")).when(writerHandler).write(any(), any(), any(), any(), any()); + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{ + put("yaf", writerHandler); + }}) { @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(Optional.of(1)); + public ParserConfigurations getConfigurations() { + return parserConfigurations; } }; - } - private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) { - bolt.execute(t); - } + parserBolt.setMessageGetStrategy(messageGetStrategy); + parserBolt.setOutputCollector(outputCollector); + parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{ + put("yafTopic", "yaf"); + }}); + JSONObject message = new JSONObject(); + message.put("field", "value"); + mockParserRunner.setMessage(message); + + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(new IllegalStateException("write failed")) + .withSensorType(Collections.singleton("yaf")) + .addRawMessage("originalMessage".getBytes(StandardCharsets.UTF_8)); + parserBolt.execute(t1); + + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), + argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); + verify(outputCollector, times(1)).reportError(any(IllegalStateException.class)); + verify(outputCollector, times(1)).ack(t1); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/28542ad6/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java index 0d6eef8..31d87a7 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java @@ -25,21 +25,20 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; + import org.apache.commons.lang.SerializationUtils; -import org.apache.metron.common.configuration.FieldValidator; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.common.utils.ReflectionUtils; import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.integration.ProcessorResult; +import org.apache.metron.parsers.ParserRunner; +import org.apache.metron.parsers.ParserRunnerImpl; import org.apache.metron.parsers.bolt.ParserBolt; import org.apache.metron.parsers.bolt.WriterHandler; -import org.apache.metron.parsers.topology.ParserComponents; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; @@ -83,41 +82,13 @@ public class ParserDriver implements Serializable { List<byte[]> errors = new ArrayList<>(); public ShimParserBolt(List<byte[]> output) { - super(null - , Collections.singletonMap( - sensorType == null ? config.getSensorTopic() : sensorType, - new ParserComponents( - ReflectionUtils.createInstance(config.getParserClassName()), - null, - new WriterHandler(new CollectingWriter(output)) - ) - ) - ); + super(null, parserRunner, Collections.singletonMap(sensorType, new WriterHandler(new CollectingWriter(output)))); this.output = output; - Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap(); - for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) { - sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig()); - } } @Override public ParserConfigurations getConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return config; - } - - @Override - public Map<String, Object> getGlobalConfig() { - return globalConfig; - } - - @Override - public List<FieldValidator> getFieldValidations() { - return new ArrayList<>(); - } - }; + return config; } @Override @@ -125,7 +96,7 @@ public class ParserDriver implements Serializable { } @Override - protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { errors.add(originalMessage); LOG.error("Error parsing message: " + ex.getMessage(), ex); } @@ -139,14 +110,20 @@ public class ParserDriver implements Serializable { } - private SensorParserConfig config; - private Map<String, Object> globalConfig; + private ParserConfigurations config; private String sensorType; + private ParserRunner parserRunner; public ParserDriver(String sensorType, String parserConfig, String globalConfig) throws IOException { - config = SensorParserConfig.fromBytes(parserConfig.getBytes()); - this.sensorType = sensorType; - this.globalConfig = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER); + SensorParserConfig sensorParserConfig = SensorParserConfig.fromBytes(parserConfig.getBytes()); + this.sensorType = sensorType == null ? sensorParserConfig.getSensorTopic() : sensorType; + config = new ParserConfigurations(); + config.updateSensorParserConfig(this.sensorType, SensorParserConfig.fromBytes(parserConfig.getBytes())); + config.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER)); + + parserRunner = new ParserRunnerImpl(new HashSet<String>() {{ + add(sensorType); + }}); } public ProcessorResult<List<byte[]>> run(Iterable<byte[]> in) { @@ -163,6 +140,7 @@ public class ParserDriver implements Serializable { public Tuple toTuple(byte[] record) { Tuple ret = mock(Tuple.class); + when(ret.getStringByField("topic")).thenReturn(sensorType); when(ret.getBinary(eq(0))).thenReturn(record); return ret; }