Repository: metron Updated Branches: refs/heads/master 7d554444e -> f072ed231
METRON-322 Global Batching and Flushing (mattf-horton) closes apache/metron#481 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/f072ed23 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/f072ed23 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/f072ed23 Branch: refs/heads/master Commit: f072ed231f9acdf0957f0244b6402b2ecd9403a0 Parents: 7d55444 Author: mattf-horton <mfo...@hortonworks.com> Authored: Thu Aug 10 17:38:59 2017 -0700 Committer: mattf <ma...@apache.org> Committed: Thu Aug 10 17:38:59 2017 -0700 ---------------------------------------------------------------------- .../configuration/IndexingConfigurations.java | 52 ++++- .../writer/IndexingWriterConfiguration.java | 11 ++ .../writer/ParserWriterConfiguration.java | 26 ++- .../writer/SingleBatchConfigurationFacade.java | 13 ++ .../writer/WriterConfiguration.java | 3 + .../apache/metron/common/system/FakeClock.java | 162 ++++++++++++++++ .../writer/IndexingWriterConfigurationTest.java | 30 ++- .../bolt/BulkMessageWriterBoltTest.java | 162 +++++++++++++--- metron-platform/metron-indexing/README.md | 19 +- .../main/config/zookeeper/indexing/test.json | 3 +- metron-platform/metron-management/README.md | 5 +- .../management/IndexingConfigFunctions.java | 8 +- .../management/IndexingConfigFunctionsTest.java | 12 +- .../SimpleHBaseEnrichmentWriterTest.java | 12 ++ .../test/bolt/BaseEnrichmentBoltTest.java | 6 +- .../metron/writer/BulkWriterComponent.java | 192 +++++++++++++++---- .../metron/writer/bolt/BatchTimeoutHelper.java | 182 ++++++++++++++++++ .../writer/bolt/BulkMessageWriterBolt.java | 140 ++++++++++++-- .../writer/bolt/BatchTimeoutHelperTest.java | 92 +++++++++ 19 files changed, 1039 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 9baf8be..1d60084 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -24,11 +24,11 @@ import org.apache.metron.common.utils.JSONUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; +import java.util.*; public class IndexingConfigurations extends Configurations { public static final String BATCH_SIZE_CONF = "batchSize"; + public static final String BATCH_TIMEOUT_CONF = "batchTimeout"; public static final String ENABLED_CONF = "enabled"; public static final String INDEX_CONF = "index"; public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction"; @@ -74,7 +74,39 @@ public class IndexingConfigurations extends Configurations { } public int getBatchSize(String sensorName, String writerName ) { - return getBatchSize(getSensorIndexingConfig(sensorName, writerName)); + return getBatchSize(getSensorIndexingConfig(sensorName, writerName)); + } + + public int getBatchTimeout(String sensorName, String writerName ) { + return getBatchTimeout(getSensorIndexingConfig(sensorName, writerName)); + } + + /** + * Returns all configured values of batchTimeout, for all configured sensors, + * but only for the specific writer identified by {@param writerName}. So, if it is + * an hdfs writer, it will return the batchTimeouts for hdfs writers for all the sensors. + * The goal is to return to a {@link org.apache.metron.common.bolt.ConfiguredBolt} + * the set of all and only batchTimeouts relevant to that ConfiguredBolt. + * + * @param writerName + * @return list of integer batchTimeouts, one per configured sensor + */ + public List<Integer> getAllConfiguredTimeouts(String writerName) { + // The configuration infrastructure was not designed to enumerate sensors, so we synthesize. + // Since getKey is in this same class, we know we can pass it a null string to get the key prefix + // for all sensor types within this capability. We then enumerate all keys in configurations.keySet + // and select those that match the key prefix, as being sensor keys. The suffix substring of + // each such key is used as a sensor name to query the batchTimeout settings, if any. + String keyPrefixString = getKey(""); + int prefixStringLength = keyPrefixString.length(); + List<Integer> configuredBatchTimeouts = new ArrayList<>(); + for (String sensorKeyString : configurations.keySet()) { + if (sensorKeyString.startsWith(keyPrefixString)) { + String configuredSensorName = sensorKeyString.substring(prefixStringLength); + configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, writerName)); + } + } + return configuredBatchTimeouts; } public String getIndex(String sensorName, String writerName) { @@ -105,6 +137,14 @@ public class IndexingConfigurations extends Configurations { ); } + public static int getBatchTimeout(Map<String, Object> conf) { + return getAs( BATCH_TIMEOUT_CONF + ,conf + , 0 + , Integer.class + ); + } + public static String getIndex(Map<String, Object> conf, String sensorName) { return getAs( INDEX_CONF ,conf @@ -132,6 +172,12 @@ public class IndexingConfigurations extends Configurations { return ret; } + public static Map<String, Object> setBatchTimeout(Map<String, Object> conf, int batchTimeout) { + Map<String, Object> ret = conf == null?new HashMap<>():conf; + ret.put(BATCH_TIMEOUT_CONF, batchTimeout); + return ret; + } + public static Map<String, Object> setIndex(Map<String, Object> conf, String index) { Map<String, Object> ret = conf == null?new HashMap<>():conf; ret.put(INDEX_CONF, index); http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index b6b2a4b..beb9373 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -20,6 +20,7 @@ package org.apache.metron.common.configuration.writer; import org.apache.metron.common.configuration.IndexingConfigurations; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -38,6 +39,16 @@ public class IndexingWriterConfiguration implements WriterConfiguration{ } @Override + public int getBatchTimeout(String sensorName) { + return config.orElse(new IndexingConfigurations()).getBatchTimeout(sensorName, writerName); + } + + @Override + public List<Integer> getAllConfiguredTimeouts() { + return config.orElse(new IndexingConfigurations()).getAllConfiguredTimeouts(writerName); + } + + @Override public String getIndex(String sensorName) { return config.orElse(new IndexingConfigurations()).getIndex(sensorName, writerName); } http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java index 821ef4a..ae74c65 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java @@ -22,6 +22,8 @@ import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.stellar.common.utils.ConversionUtils; +import java.util.ArrayList; +import java.util.List; import java.util.Map; public class ParserWriterConfiguration implements WriterConfiguration { @@ -32,9 +34,9 @@ public class ParserWriterConfiguration implements WriterConfiguration { @Override public int getBatchSize(String sensorName) { if(config != null - && config.getSensorParserConfig(sensorName) != null - && config.getSensorParserConfig(sensorName).getParserConfig() != null - ) { + && config.getSensorParserConfig(sensorName) != null + && config.getSensorParserConfig(sensorName).getParserConfig() != null + ) { Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_SIZE_CONF); return batchObj == null ? 1 : ConversionUtils.convert(batchObj, Integer.class); } @@ -42,6 +44,24 @@ public class ParserWriterConfiguration implements WriterConfiguration { } @Override + public int getBatchTimeout(String sensorName) { + if(config != null + && config.getSensorParserConfig(sensorName) != null + && config.getSensorParserConfig(sensorName).getParserConfig() != null + ) { + Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_TIMEOUT_CONF); + return batchObj == null ? 0 : ConversionUtils.convert(batchObj, Integer.class); + } + return 0; + } + + @Override + public List<Integer> getAllConfiguredTimeouts() { + // TODO - stub implementation pending METRON-750 + return new ArrayList<Integer>(); + } + + @Override public String getIndex(String sensorName) { if(config != null && config.getSensorParserConfig(sensorName) != null && config.getSensorParserConfig(sensorName).getParserConfig() != null http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java index 69e5541..e50bd2b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java @@ -18,6 +18,8 @@ package org.apache.metron.common.configuration.writer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; public class SingleBatchConfigurationFacade implements WriterConfiguration { @@ -32,6 +34,17 @@ public class SingleBatchConfigurationFacade implements WriterConfiguration { } @Override + public int getBatchTimeout(String sensorName) { + return 0; + } + + @Override + public List<Integer> getAllConfiguredTimeouts() { + // null implementation since batching is disabled + return new ArrayList<Integer>(); + } + + @Override public String getIndex(String sensorName) { return config.getIndex(sensorName); } http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 45271e8..2354f95 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -19,10 +19,13 @@ package org.apache.metron.common.configuration.writer; import java.io.Serializable; +import java.util.List; import java.util.Map; public interface WriterConfiguration extends Serializable { int getBatchSize(String sensorName); + int getBatchTimeout(String sensorName); + List<Integer> getAllConfiguredTimeouts(); String getIndex(String sensorName); boolean isEnabled(String sensorName); Map<String, Object> getSensorConfig(String sensorName); http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java new file mode 100644 index 0000000..e34a26d --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.system; + +import java.util.concurrent.TimeUnit; + +/** + * A fake clock for test purposes, that starts out at time zero (epoch), and + * never advances itself, but allows you to increment it by any desired amount. + * + * Note that the base class is not the Java 8 Clock, but rather the Clock we + * defined in {@org.apache.metron.common.system.Clock}. Fundamental units of time + * are milliseconds. + * + * Three exceptions are also defined: {@IllegalArgumentClockNegative}, + * {@IllegalArgumentClockZero}, and {@IllegalArgumentClockOverflow}. + * These are thrown in various circumstances that imply the FakeClock is + * being used outside of its design intent. They are subclasses of IllegalArgumentException, + * hence unchecked. + */ +public class FakeClock extends Clock { + private long now_ms = 0; + + @Override + public long currentTimeMillis() { + return now_ms; + } + + /** + * Advance the fake clock by a number of milliseconds. + * @param duration_ms + * + * @throws IllegalArgumentClockNegative (unchecked) if you try to go backwards in time. + * This is not an allowed behavior, because most system clocks go to great + * effort to make sure it never happens, even with, e.g., anomalous events + * from a bad NTP server. + * If we really get a demand for this capability, we'll add methods that don't + * check for this. + * @throws IllegalArgumentClockOverflow (unchecked) if you try to add a duration + * that would overflow the Long value of {@currentTimeMillis} + */ + public void elapseMillis(long duration_ms) { + long instant_ms = now_ms + duration_ms; + if (duration_ms < 0) { + throw new IllegalArgumentClockNegative(String.format( + "Attempted to move backward in time, by %d milliseconds." + , duration_ms)); + } + else if (instant_ms < 0) { + throw new IllegalArgumentClockOverflow(String.format( + "Attempted to advance beyond the edge of time, to epoch %d + %d." + , now_ms, duration_ms)); + } + now_ms = instant_ms; + } + + /** + * Advance the fake clock by a number of seconds. + * See {@elapseMillis} for details. + * + * @param duration_secs + */ + public void elapseSeconds(long duration_secs) { + elapseMillis(TimeUnit.SECONDS.toMillis(duration_secs)); + } + + /** + * Advance the fake clock to a point in time specified as milliseconds after 0. + * @param instant_ms - epoch time in milliseconds + * + * @throws IllegalArgumentClockNegative (unchecked) if you try to go backwards in time. + * This is not an allowed behavior, because most system clocks go to great + * effort to make sure it never happens, even with, e.g., anomalous events + * from a bad NTP server. + * If we really get a demand for this capability, we'll add methods that don't + * check for this. + * @throws IllegalArgumentClockZero (unchecked) if you try to "advance" the clock to the time it already is. + * Why? Because it implies your test code has lost track of previous increments, + * which might be problematic, so we do this in the spirit of "fail fast". + * If you *meant* to lose track, for instance if you were using random numbers of events, + * or whatever, you can always orient yourself in time by reading {@currentTimeMillis}. + */ + public void advanceToMillis(long instant_ms) { + if (instant_ms < now_ms) { + throw new IllegalArgumentClockNegative(String.format( + "Attempted to move backward in time, from epoch %d to %d." + , now_ms, instant_ms)); + } + if (instant_ms == now_ms) { + throw new IllegalArgumentClockZero(String.format( + "Time was set to current time, with null advance, at epoch %d." + , now_ms)); + } + now_ms = instant_ms; + } + + /** + * Advance the fake clock to a point in time specified as seconds after 0. + * See {@advanceToMillis} for details. + * + * @param instant_secs - epoch time in seconds + */ + public void advanceToSeconds(long instant_secs) { + advanceToMillis(TimeUnit.SECONDS.toMillis(instant_secs)); + } + + /** + * IllegalArgumentClockNegative (unchecked) is thrown if you try to go backwards in time. + * This is not an allowed behavior, because most system clocks go to great + * effort to make sure it never happens, even with, e.g., anomalous events + * from a bad NTP server. + * If we really get a demand for this capability, we'll add methods that don't + * check for this. + */ + public static class IllegalArgumentClockNegative extends IllegalArgumentException { + public IllegalArgumentClockNegative(String s) { + super(s); + } + } + + /** + * IllegalArgumentClockZero (unchecked) is thrown if you try to "advance" the clock to the time it already is. + * Why? Because it implies your test code has lost track of previous increments, + * which might be problematic, so we do this in the spirit of "fail fast". + * If you *meant* to lose track, for instance if you were using random numbers of events, + * or whatever, you can always orient yourself in time by reading {@currentTimeMillis}. + * + * Note that argument does not apply to ellapseMillis(0), so it does not throw + * this exception. + */ + public static class IllegalArgumentClockZero extends IllegalArgumentException { + public IllegalArgumentClockZero(String s) { + super(s); + } + } + + /** + * IllegalArgumentClockOverflow (unchecked) is thrown if you try to add a duration + * that would overflow the Long value of {@currentTimeMillis} + */ + public static class IllegalArgumentClockOverflow extends IllegalArgumentException { + public IllegalArgumentClockOverflow(String s) { + super(s); + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java index 94da965..aec215f 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java @@ -18,12 +18,18 @@ package org.apache.metron.common.writer; -import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; import org.junit.Assert; import org.junit.Test; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath; +import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sensorType; + public class IndexingWriterConfigurationTest { @Test public void testDefaultBatchSize() { @@ -33,6 +39,28 @@ public class IndexingWriterConfigurationTest { Assert.assertEquals(1, config.getBatchSize("foo")); } @Test + public void testDefaultBatchTimeout() { + IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", + new IndexingConfigurations() + ); + Assert.assertEquals(0, config.getBatchTimeout("foo")); + } + @Test + public void testGetAllConfiguredTimeouts() throws FileNotFoundException, IOException { + //default + IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", + new IndexingConfigurations() + ); + Assert.assertEquals(0, config.getAllConfiguredTimeouts().size()); + //non-default + IndexingConfigurations iconfigs = new IndexingConfigurations(); + iconfigs.updateSensorIndexingConfig( + sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + config = new IndexingWriterConfiguration("elasticsearch", iconfigs); + Assert.assertEquals(1, config.getAllConfiguredTimeouts().size()); + Assert.assertEquals(7, (long)config.getAllConfiguredTimeouts().get(0)); + } + @Test public void testDefaultIndex() { IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", new IndexingConfigurations() http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 79d8285..5f6f22f 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -17,21 +17,22 @@ */ package org.apache.metron.enrichment.bolt; +import org.adrianwalker.multilinestring.Multiline; import org.apache.log4j.Level; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; +import org.apache.metron.common.system.FakeClock; +import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.metron.writer.BulkWriterComponent; +import org.apache.metron.writer.bolt.BulkMessageWriterBolt; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; -import org.apache.metron.common.writer.BulkMessageWriter; -import org.apache.metron.writer.bolt.BulkMessageWriterBolt; import org.hamcrest.Description; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -48,15 +49,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { @@ -93,23 +91,24 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { private JSONObject sampleMessage; private List<JSONObject> messageList; + private List<JSONObject> fullMessageList; private List<Tuple> tupleList; @Before public void parseMessages() throws ParseException { JSONParser parser = new JSONParser(); + fullMessageList = new ArrayList<>(); sampleMessage = (JSONObject) parser.parse(sampleMessageString); sampleMessage.put("field", "value1"); - messageList = new ArrayList<>(); - messageList.add(((JSONObject) sampleMessage.clone())); + fullMessageList.add(((JSONObject) sampleMessage.clone())); sampleMessage.put("field", "value2"); - messageList.add(((JSONObject) sampleMessage.clone())); + fullMessageList.add(((JSONObject) sampleMessage.clone())); sampleMessage.put("field", "value3"); - messageList.add(((JSONObject) sampleMessage.clone())); + fullMessageList.add(((JSONObject) sampleMessage.clone())); sampleMessage.put("field", "value4"); - messageList.add(((JSONObject) sampleMessage.clone())); + fullMessageList.add(((JSONObject) sampleMessage.clone())); sampleMessage.put("field", "value5"); - messageList.add(((JSONObject) sampleMessage.clone())); + fullMessageList.add(((JSONObject) sampleMessage.clone())); } @Mock @@ -119,14 +118,17 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { private MessageGetStrategy messageGetStrategy; @Test - public void test() throws Exception { + public void testFlushOnBatchSize() throws Exception { BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") - .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()).withMessageGetterField("message"); + .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setTreeCache(cache); - bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, + new FileInputStream(sampleSensorIndexingConfigPath)); bulkMessageWriterBolt.declareOutputFields(declarer); - verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message"))); + verify(declarer, times(1)).declareStream(eq("error"), argThat( + new FieldsMatcher("message"))); Map stormConf = new HashMap(); doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); try { @@ -138,23 +140,31 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector); verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); tupleList = new ArrayList<>(); + messageList = new ArrayList<>(); for(int i = 0; i < 4; i++) { - when(tuple.getValueByField("message")).thenReturn(messageList.get(i)); + when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i)); tupleList.add(tuple); + messageList.add(fullMessageList.get(i)); bulkMessageWriterBolt.execute(tuple); - verify(bulkMessageWriter, times(0)).write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList), eq(messageList)); + verify(bulkMessageWriter, times(0)).write(eq(sensorType) + , any(WriterConfiguration.class), eq(tupleList), eq(messageList)); } - when(tuple.getValueByField("message")).thenReturn(messageList.get(4)); + when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(4)); tupleList.add(tuple); + messageList.add(fullMessageList.get(4)); BulkWriterResponse response = new BulkWriterResponse(); response.addAllSuccesses(tupleList); - when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList), argThat(new MessageListMatcher(messageList)))).thenReturn(response); + when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList) + , argThat(new MessageListMatcher(messageList)))).thenReturn(response); bulkMessageWriterBolt.execute(tuple); - verify(bulkMessageWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList), argThat(new MessageListMatcher(messageList))); + verify(bulkMessageWriter, times(1)).write(eq(sensorType) + , any(WriterConfiguration.class), eq(tupleList) + , argThat(new MessageListMatcher(messageList))); verify(outputCollector, times(5)).ack(tuple); reset(outputCollector); - doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(WriterConfiguration.class), Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class)); - when(tuple.getValueByField("message")).thenReturn(messageList.get(0)); + doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(WriterConfiguration.class) + , Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class)); + when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(0)); UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.FATAL); for(int i = 0; i < 5; i++) { bulkMessageWriterBolt.execute(tuple); @@ -164,4 +174,100 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class)); verify(outputCollector, times(1)).reportError(any(Throwable.class)); } + + @Test + public void testFlushOnBatchTimeout() throws Exception { + FakeClock clock = new FakeClock(); + BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") + .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message").withBatchTimeoutDivisor(3); + bulkMessageWriterBolt.setCuratorFramework(client); + bulkMessageWriterBolt.setTreeCache(cache); + bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, + new FileInputStream(sampleSensorIndexingConfigPath)); + bulkMessageWriterBolt.declareOutputFields(declarer); + verify(declarer, times(1)).declareStream(eq("error") + , argThat(new FieldsMatcher("message"))); + Map stormConf = new HashMap(); + when(bulkMessageWriter.getName()).thenReturn("elasticsearch"); + bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock); + verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); + int batchTimeout = bulkMessageWriterBolt.getDefaultBatchTimeout(); + assertEquals(4, batchTimeout); + tupleList = new ArrayList<>(); + messageList = new ArrayList<>(); + for(int i = 0; i < 3; i++) { + when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i)); + tupleList.add(tuple); + messageList.add(fullMessageList.get(i)); + bulkMessageWriterBolt.execute(tuple); + verify(bulkMessageWriter, times(0)).write(eq(sensorType) + , any(WriterConfiguration.class), eq(tupleList), eq(messageList)); + } + clock.elapseSeconds(5); + when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(3)); + tupleList.add(tuple); + messageList.add(fullMessageList.get(3)); + BulkWriterResponse response = new BulkWriterResponse(); + response.addAllSuccesses(tupleList); + when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList) + , argThat(new MessageListMatcher(messageList)))).thenReturn(response); + bulkMessageWriterBolt.execute(tuple); + verify(bulkMessageWriter, times(1)).write(eq(sensorType) + , any(WriterConfiguration.class) + , eq(tupleList), argThat(new MessageListMatcher(messageList))); + verify(outputCollector, times(4)).ack(tuple); + } + + @Test + public void testFlushOnTickTuple() throws Exception { + FakeClock clock = new FakeClock(); + BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") + .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message"); + bulkMessageWriterBolt.setCuratorFramework(client); + bulkMessageWriterBolt.setTreeCache(cache); + bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType + , new FileInputStream(sampleSensorIndexingConfigPath)); + bulkMessageWriterBolt.declareOutputFields(declarer); + verify(declarer, times(1)).declareStream(eq("error") + , argThat(new FieldsMatcher("message"))); + Map stormConf = new HashMap(); + when(bulkMessageWriter.getName()).thenReturn("elasticsearch"); + bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock); + verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class) + , any(WriterConfiguration.class)); + int batchTimeout = bulkMessageWriterBolt.getDefaultBatchTimeout(); + assertEquals(14, batchTimeout); + tupleList = new ArrayList<>(); + messageList = new ArrayList<>(); + for(int i = 0; i < 3; i++) { + when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i)); + tupleList.add(tuple); + messageList.add(fullMessageList.get(i)); + bulkMessageWriterBolt.execute(tuple); + verify(bulkMessageWriter, times(0)).write(eq(sensorType) + , any(WriterConfiguration.class), eq(tupleList), eq(messageList)); + } + when(tuple.getValueByField("message")).thenReturn(null); + when(tuple.getSourceComponent()).thenReturn("__system"); //mark the tuple as a TickTuple, part 1 of 2 + when(tuple.getSourceStreamId()).thenReturn("__tick"); //mark the tuple as a TickTuple, part 2 of 2 + BulkWriterResponse response = new BulkWriterResponse(); + response.addAllSuccesses(tupleList); + when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList) + , argThat(new MessageListMatcher(messageList)))).thenReturn(response); + clock.advanceToSeconds(2); + bulkMessageWriterBolt.execute(tuple); + verify(bulkMessageWriter, times(0)).write(eq(sensorType) + , any(WriterConfiguration.class) + , eq(tupleList), argThat(new MessageListMatcher(messageList))); + verify(outputCollector, times(1)).ack(tuple); // 1 tick + clock.advanceToSeconds(9); + bulkMessageWriterBolt.execute(tuple); + verify(bulkMessageWriter, times(1)).write(eq(sensorType) + , any(WriterConfiguration.class) + , eq(tupleList), argThat(new MessageListMatcher(messageList))); + assertEquals(3, tupleList.size()); + verify(outputCollector, times(5)).ack(tuple); // 3 messages + 2nd tick + } } http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-indexing/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index 2095d0f..aea670c 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -11,9 +11,9 @@ from the enrichment topology that have been enriched and storing the data in one By default, this topology writes out to both HDFS and one of Elasticsearch and Solr. -Indices are written in batch and the batch size is specified in the -[Sensor Indexing Configuration](#sensor-indexing-configuration) via the `batchSize` parameter. -This config is variable by sensor type. +Indices are written in batch and the batch size and batch timeout are specified in the +[Sensor Indexing Configuration](#sensor-indexing-configuration) via the `batchSize` and `batchTimeout` parameters. +These configs are variable by sensor type. ## Indexing Architecture @@ -40,7 +40,10 @@ elasticsearch or solr and hdfs writers running. The configuration for an individual writer-specific configuration is a JSON map with the following fields: * `index` : The name of the index to write to (defaulted to the name of the sensor). -* `batchSize` : The size of the batch that is written to the indices at once (defaulted to `1`). +* `batchSize` : The size of the batch that is written to the indices at once. Defaults to `1` (no batching). +* `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. +If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm +parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. * `enabled` : Whether the writer is enabled (default `true`). ### Indexing Configuration Examples @@ -56,10 +59,12 @@ or no file at all. * elasticsearch writer * enabled * batch size of 1 + * batch timeout system default * index name the same as the sensor * hdfs writer * enabled * batch size of 1 + * batch timeout system default * index name the same as the sensor If a writer config is unspecified, then a warning is indicated in the @@ -72,11 +77,13 @@ Storm console. e.g.: "elasticsearch": { "index": "foo", "batchSize" : 100, + "batchTimeout" : 0, "enabled" : true }, "hdfs": { "index": "foo", "batchSize": 1, + "batchTimeout" : 0, "enabled" : true } } @@ -84,10 +91,12 @@ Storm console. e.g.: * elasticsearch writer * enabled * batch size of 100 + * batch timeout system default * index name of "foo" * hdfs writer * enabled * batch size of 1 + * batch timeout system default * index name of "foo" #### HDFS Writer turned off @@ -100,6 +109,7 @@ Storm console. e.g.: "hdfs": { "index": "foo", "batchSize": 100, + "batchTimeout" : 0, "enabled" : false } } @@ -107,6 +117,7 @@ Storm console. e.g.: * elasticsearch writer * enabled * batch size of 1 + * batch timeout system default * index name of "foo" * hdfs writer * disabled http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json index 0197f0c..17ed56c 100644 --- a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json +++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json @@ -6,7 +6,8 @@ }, "elasticsearch" : { "index": "yaf", - "batchSize": 5, + "batchSize": 25, + "batchTimeout": 7, "enabled" : true }, "solr" : { http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-management/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/README.md b/metron-platform/metron-management/README.md index d256885..07c6908 100644 --- a/metron-platform/metron-management/README.md +++ b/metron-platform/metron-management/README.md @@ -23,7 +23,7 @@ The functions are split roughly into a few sections: * File functions - Functions around interacting with local or HDFS files * Configuration functions - Functions surrounding pulling and pushing configs from zookeeper * Parser functions - Functions surrounding adding, viewing, and removing Parser functions. -* Enrichment functions - Functions surrounding adding, viewing and removing Stellar enrichments as well as managing batch size and index names for the enrichment topology configuration +* Enrichment functions - Functions surrounding adding, viewing and removing Stellar enrichments as well as managing batch size, batch timeout, and index names for the enrichment topology configuration * Threat Triage functions - Functions surrounding adding, viewing and removing threat triage functions. ### Grok Functions @@ -169,11 +169,12 @@ The functions are split roughly into a few sections: ### Indexing Functions * `INDEXING_SET_BATCH` - * Description: Set batch size + * Description: Set batch size and timeout * Input: * sensorConfig - Sensor config to add transformation to. * writer - The writer to update (e.g. elasticsearch, solr or hdfs) * size - batch size (integer), defaults to 1, meaning batching disabled + * timeout - (optional) batch timeout in seconds (integer), defaults to 0, meaning system default * Returns: The String representation of the config in zookeeper * `INDEXING_SET_ENABLED` * Description: Enable or disable an indexing writer for a sensor. http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java index 128daea..235044f 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java @@ -40,10 +40,11 @@ public class IndexingConfigFunctions { @Stellar( namespace = "INDEXING" ,name = "SET_BATCH" - ,description = "Set batch size" + ,description = "Set batch size and timeout" ,params = {"sensorConfig - Sensor config to add transformation to." ,"writer - The writer to update (e.g. elasticsearch, solr or hdfs)" ,"size - batch size (integer), defaults to 1, meaning batching disabled" + ,"timeout - (optional) batch timeout in seconds (integer), defaults to 0, meaning system default" } ,returns = "The String representation of the config in zookeeper" ) @@ -78,6 +79,11 @@ public class IndexingConfigFunctions { } } configObj.put(writer, IndexingConfigurations.setBatchSize((Map<String, Object>) configObj.get(writer), batchSize)); + int batchTimeout = 0; + if(args.size() > 3) { + batchTimeout = ConversionUtils.convert(args.get(i++), Integer.class); + } + configObj.put(writer, IndexingConfigurations.setBatchTimeout((Map<String, Object>) configObj.get(writer), batchTimeout)); try { return JSONUtils.INSTANCE.toJSON(configObj, true); } catch (JsonProcessingException e) { http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java index bfed827..29c80b8 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java @@ -62,7 +62,17 @@ public class IndexingConfigFunctionsTest { , toMap("config", "{}") ); Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out); - Assert.assertEquals(IndexingConfigurations.getBatchSize((Map<String, Object>) config.get("hdfs")), 10); + Assert.assertEquals(10, IndexingConfigurations.getBatchSize((Map<String, Object>) config.get("hdfs"))); + } + + @Test + public void testSetBatchWithTimeout() { + String out = (String) run("INDEXING_SET_BATCH(config, 'hdfs', 10, 2)" + , toMap("config", "{}") + ); + Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out); + Assert.assertEquals(10, IndexingConfigurations.getBatchSize((Map<String, Object>) config.get("hdfs"))); + Assert.assertEquals(2, IndexingConfigurations.getBatchTimeout((Map<String, Object>) config.get("hdfs"))); } @Test(expected=IllegalStateException.class) http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java index 5f4b3fd..15c84da 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java @@ -159,6 +159,18 @@ public class SimpleHBaseEnrichmentWriterTest { } @Override + public int getBatchTimeout(String sensorName) { + //TODO - enable unit testing + return 0; + } + + @Override + public List<Integer> getAllConfiguredTimeouts() { + //TODO - enable unit testing + return new ArrayList<>(); + } + + @Override public String getIndex(String sensorName) { return SENSOR_TYPE; } http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java index 8e351ff..f270d97 100644 --- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java +++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java @@ -29,12 +29,12 @@ import java.util.Set; public class BaseEnrichmentBoltTest extends BaseBoltTest { - public String sampleSensorEnrichmentConfigPath = TestConstants.SAMPLE_CONFIG_PATH + "enrichments/test.json"; - public String sampleSensorIndexingConfigPath = TestConstants.SAMPLE_CONFIG_PATH + "indexing/test.json"; + public static final String sampleSensorEnrichmentConfigPath = TestConstants.SAMPLE_CONFIG_PATH + "enrichments/test.json"; + public static final String sampleSensorIndexingConfigPath = TestConstants.SAMPLE_CONFIG_PATH + "indexing/test.json"; protected Set<String> streamIds = new HashSet<>(); protected Set<String> joinStreamIds = new HashSet<>(); protected String key = "someKey"; - protected String sensorType = "test"; + public static final String sensorType = "test"; /** * { http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index df6ee90..37c624f 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -23,6 +23,7 @@ import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.ErrorUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; @@ -38,15 +39,51 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; +/** + * This component implements message batching, with both flush on queue size, and flush on queue timeout. + * There is a queue for each sensorType. + * Ideally each queue would have its own timer, but we only have one global timer, the Tick Tuple + * generated at fixed intervals by the system and received by the Bolt. Given this constraint, + * we use the following strategy: + * - The default batchTimeout is, as recommended by Storm, 1/2 the Storm 'topology.message.timeout.secs', + * modified by batchTimeoutDivisor, in case multiple batching writers are daisy-chained in one topology. + * - If some sensors configure their own batchTimeouts, they are compared with the default. Batch + * timeouts greater than the default will be ignored, because they can cause message recycling in Storm. + * Batch timeouts configured to <= zero, or undefined, mean use the default. + * - The *smallest* configured batchTimeout among all sensor types, greater than zero and less than + * the default, will be used to configure the 'topology.tick.tuple.freq.secs' for the Bolt. If there are no + * valid configured batchTimeouts, the defaultBatchTimeout will be used. + * - The age of the queue is checked every time a sensor message arrives. Thus, if at least one message + * per second is received for a given sensor, that queue will flush on timeout or sooner, depending on batchSize. + * - On each Tick Tuple received, *all* queues will be checked, and if any are older than their respective + * batchTimeout, they will be flushed. Note that this does NOT guarantee timely flushing, depending on the + * phase relationship between the queue's batchTimeout and the tick interval. The maximum age of a queue + * before it flushes is its batchTimeout + the tick interval, which is guaranteed to be less than 2x the + * batchTimeout, and also less than the 'topology.message.timeout.secs'. This guarantees that the messages + * will not age out of the Storm topology, but it does not guarantee the flush interval requested, for + * sensor types not receiving at least one message every second. + * + * @param <MESSAGE_T> + */ public class BulkWriterComponent<MESSAGE_T> { public static final Logger LOG = LoggerFactory .getLogger(BulkWriterComponent.class); private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>(); private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>(); + private Map<String, long[]> batchTimeoutMap = new HashMap<>(); private OutputCollector collector; + //In test scenarios, defaultBatchTimeout may not be correctly initialized, so do it here. + //This is a conservative defaultBatchTimeout for a vanilla bolt with batchTimeoutDivisor=2 + public static final int UNINITIALIZED_DEFAULT_BATCH_TIMEOUT = 6; + private int defaultBatchTimeout = UNINITIALIZED_DEFAULT_BATCH_TIMEOUT; private boolean handleCommit = true; private boolean handleError = true; + private static final int LAST_CREATE_TIME_MS = 0; //index zero'th element of long[] in batchTimeoutMap + private static final int TIMEOUT_MS = 1; //index next element of long[] in batchTimeoutMap + private Clock clock = new Clock(); + public BulkWriterComponent(OutputCollector collector) { this.collector = collector; } @@ -57,6 +94,15 @@ public class BulkWriterComponent<MESSAGE_T> { this.handleError = handleError; } + /** + * Used only for testing. Overrides the default (actual) wall clock. + * @return this mutated BulkWriterComponent + */ + public BulkWriterComponent withClock(Clock clock) { + this.clock = clock; + return this; + } + public void commit(Iterable<Tuple> tuples) { tuples.forEach(t -> collector.ack(t)); if(LOG.isDebugEnabled()) { @@ -92,7 +138,6 @@ public class BulkWriterComponent<MESSAGE_T> { return new ArrayList<>(); } - public void errorAll(Throwable e, MessageGetStrategy messageGetStrategy) { for(String key : new HashSet<>(sensorTupleMap.keySet())) { errorAll(key, e, messageGetStrategy); @@ -114,55 +159,136 @@ public class BulkWriterComponent<MESSAGE_T> { , MessageGetStrategy messageGetStrategy ) throws Exception { - if(!configurations.isEnabled(sensorType)) { + if (!configurations.isEnabled(sensorType)) { collector.ack(tuple); return; } int batchSize = configurations.getBatchSize(sensorType); + + if (batchSize <= 1) { //simple case - no batching, no timeouts + Collection<Tuple> tupleList = sensorTupleMap.get(sensorType); //still read in case batchSize changed + if (tupleList == null) { + tupleList = createTupleCollection(); + } + tupleList.add(tuple); + + List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType); //still read in case batchSize changed + if (messageList == null) { + messageList = new ArrayList<>(); + } + messageList.add(message); + + flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList); + return; + } + + //Otherwise do the full batch buffering with timeouts + long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType); + if (batchTimeoutInfo == null) { + //lazily create the batchTimeoutInfo array, once per sensor. + batchTimeoutInfo = new long[] {0L, 0L}; + batchTimeoutMap.put(sensorType, batchTimeoutInfo); + } + Collection<Tuple> tupleList = sensorTupleMap.get(sensorType); if (tupleList == null) { + //This block executes at the beginning of every batch, per sensor. tupleList = createTupleCollection(); + sensorTupleMap.put(sensorType, tupleList); + batchTimeoutInfo[LAST_CREATE_TIME_MS] = clock.currentTimeMillis(); + //configurations can change, so (re)init getBatchTimeout(sensorType) at start of every batch + int batchTimeoutSecs = configurations.getBatchTimeout(sensorType); + if (batchTimeoutSecs <= 0 || batchTimeoutSecs > defaultBatchTimeout) { + batchTimeoutSecs = defaultBatchTimeout; + } + batchTimeoutInfo[TIMEOUT_MS] = TimeUnit.SECONDS.toMillis(batchTimeoutSecs); } tupleList.add(tuple); + List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType); if (messageList == null) { messageList = new ArrayList<>(); + sensorMessageMap.put(sensorType, messageList); } messageList.add(message); - if (tupleList.size() < batchSize) { - sensorTupleMap.put(sensorType, tupleList); - sensorMessageMap.put(sensorType, messageList); - } else { - long startTime = System.nanoTime(); - try { - BulkWriterResponse response = bulkMessageWriter.write(sensorType, configurations, tupleList, messageList); - - // Commit or error piecemeal. - if(handleCommit) { - commit(response); - } - - if(handleError) { - error(sensorType, response, messageGetStrategy); - } else if (response.hasErrors()) { - throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors()); - } - } catch (Throwable e) { - if(handleError) { - error(sensorType, e, tupleList, messageGetStrategy); - } - else { - throw e; - } + //Check for batchSize flush + if (tupleList.size() >= batchSize) { + flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList); + return; + } + //Check for batchTimeout flush (if the tupleList isn't brand new). + //Debugging note: If your queue always flushes at length==2 regardless of feed rate, + //it may mean defaultBatchTimeout has somehow been set to zero. + if (tupleList.size() > 1 && (clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS] >= batchTimeoutInfo[TIMEOUT_MS])) { + flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList); + return; + } + } + + private void flush( String sensorType + , BulkMessageWriter<MESSAGE_T> bulkMessageWriter + , WriterConfiguration configurations + , MessageGetStrategy messageGetStrategy + , Collection<Tuple> tupleList + , List<MESSAGE_T> messageList + ) throws Exception + { + long startTime = System.currentTimeMillis(); //no need to mock, so use real time + try { + BulkWriterResponse response = bulkMessageWriter.write(sensorType, configurations, tupleList, messageList); + + // Commit or error piecemeal. + if(handleCommit) { + commit(response); } - finally { - sensorTupleMap.remove(sensorType); - sensorMessageMap.remove(sensorType); + + if(handleError) { + error(sensorType, response, messageGetStrategy); + } else if (response.hasErrors()) { + throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors()); + } + } catch (Throwable e) { + if(handleError) { + error(sensorType, e, tupleList, messageGetStrategy); + } + else { + throw e; + } + } + finally { + sensorTupleMap.remove(sensorType); + sensorMessageMap.remove(sensorType); + } + long endTime = System.currentTimeMillis(); + long elapsed = endTime - startTime; + LOG.debug("Bulk batch for sensor {} completed in ~{} ns", sensorType, elapsed); + } + + // Flushes all queues older than their batchTimeouts. + public void flushTimeouts( + BulkMessageWriter<MESSAGE_T> bulkMessageWriter + , WriterConfiguration configurations + , MessageGetStrategy messageGetStrategy + ) throws Exception + { + // No need to do "all" sensorTypes here, just the ones that have data batched up. + // Note queues with batchSize == 1 don't get batched, so they never persist in the sensorTupleMap. + for (String sensorType : sensorTupleMap.keySet()) { + long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType); + if (batchTimeoutInfo == null //Shouldn't happen, but conservatively flush if so + || clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS] >= batchTimeoutInfo[TIMEOUT_MS]) { + flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy + , sensorTupleMap.get(sensorType), sensorMessageMap.get(sensorType)); + return; } - long endTime = System.nanoTime(); - long elapsed = endTime - startTime; - LOG.debug("Bulk batch completed in ~{} ns", elapsed); } } + + /** + * @param defaultBatchTimeout + */ + public void setDefaultBatchTimeout(int defaultBatchTimeout) { + this.defaultBatchTimeout = defaultBatchTimeout; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java new file mode 100644 index 0000000..22440ad --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.writer.bolt; + +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Routines to help figure out the effective batchTimeout(s), using information from + * multiple configuration sources, topology.message.timeout.secs, and batchTimeoutDivisor, + * and use it to calculate defaultBatchTimeout and appropriate topology.tick.tuple.freq.secs. + * + * These methods cause no side effects outside of setting the internal member variables. + * "base" config are from defaults and storm.yaml (subordinate to Component config settings) + * "cli" config are from CLI arguments (superior to Component config settings) + * 0 or Integer.MAX_VALUE means disabled, except in batchTimeout values, where + * 0 means use the default. + * + * These lookups are fairly expensive, and changing the result values currently require + * a restart of the Storm topology anyway, so this implementation caches its results for + * re-reading. If you want different results, you'll need a new instance or a restart. + */ +public class BatchTimeoutHelper { + + private static final Logger LOG = LoggerFactory + .getLogger(BulkMessageWriterBolt.class); + private boolean initialized = false; + private Supplier<List<Integer>> listAllConfiguredTimeouts; + protected int batchTimeoutDivisor; + protected int baseMessageTimeoutSecs; + protected int cliMessageTimeoutSecs; + protected int baseTickTupleFreqSecs; + protected int cliTickTupleFreqSecs; + protected int effectiveMessageTimeoutSecs; + protected int maxBatchTimeoutAllowedSecs; //derived from MessageTimeoutSecs value + protected int minBatchTimeoutRequestedSecs; //min of all sensorType configured batchTimeout requests + protected int recommendedTickIntervalSecs; //the answer + + BatchTimeoutHelper( Supplier<List<Integer>> listAllConfiguredTimeouts + , int batchTimeoutDivisor + ) + { + // The two arguments to the constructor are information only available at the Bolt object (batchTimeoutDivisor) + // and WriterConfiguration object (listAllConfiguredTimeouts). + this.batchTimeoutDivisor = batchTimeoutDivisor; + this.listAllConfiguredTimeouts = listAllConfiguredTimeouts; + // Reads and calculations are deferred until first call, then frozen for the duration of this BatchTimeoutHelper instance. + } + + private synchronized void init() { + if (initialized) return; + readGlobalTimeoutConfigs(); + calcMaxBatchTimeoutAllowed(); + readMinBatchTimeoutRequested(); + calcRecommendedTickInterval(); + initialized = true; + } + + //modified from Utils.readStormConfig() + private Map readStormConfigWithoutCLI() { + Map ret = Utils.readDefaultConfig(); + String confFile = System.getProperty("storm.conf.file"); + Map storm; + if (confFile == null || confFile.equals("")) { + storm = Utils.findAndReadConfigFile("storm.yaml", false); + } else { + storm = Utils.findAndReadConfigFile(confFile, true); + } + ret.putAll(storm); + return ret; + } + + private void readGlobalTimeoutConfigs() { + Map stormConf = readStormConfigWithoutCLI(); + Map cliConf = Utils.readCommandLineOpts(); + //parameter TOPOLOGY_MESSAGE_TIMEOUT_SECS is declared @isInteger and @NotNull in storm-core (org.apache.storm.Config) + baseMessageTimeoutSecs = + (Integer) stormConf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0); + cliMessageTimeoutSecs = + (Integer) cliConf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0); + //parameter TOPOLOGY_TICK_TUPLE_FREQ_SECS is only declared @isInteger, and may in fact return null + Object scratch; + scratch = stormConf.getOrDefault(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 0); + baseTickTupleFreqSecs = (scratch == null) ? 0 : (Integer) scratch; + scratch = cliConf.getOrDefault(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 0); + cliTickTupleFreqSecs = (scratch == null) ? 0 : (Integer) scratch; + } + + private void calcMaxBatchTimeoutAllowed() { + // The max batchTimeout allowed also becomes the default batchTimeout. + effectiveMessageTimeoutSecs = (cliMessageTimeoutSecs == 0 ? baseMessageTimeoutSecs : cliMessageTimeoutSecs); + if (effectiveMessageTimeoutSecs == 0) { + LOG.info("topology.message.timeout.secs is disabled in both Storm config and CLI. Allowing unlimited batchTimeouts."); + maxBatchTimeoutAllowedSecs = Integer.MAX_VALUE; + } + else { + //Recommended value for safe batchTimeout is 1/2 * TOPOLOGY_MESSAGE_TIMEOUT_SECS. + //We further divide this by batchTimeoutDivisor for the particular Writer Bolt we are in, + //and subtract a delta of 1 second for surety (as well as rounding down). + //So if everything is defaulted, maxBatchTimeoutAllowedSecs is 14. + maxBatchTimeoutAllowedSecs = effectiveMessageTimeoutSecs / 2 / batchTimeoutDivisor - 1; + if (maxBatchTimeoutAllowedSecs <= 0) { //disallowed, and shouldn't happen with reasonable configs + maxBatchTimeoutAllowedSecs = 1; + } + } + } + + /** + * @return the max batchTimeout allowed, in seconds + * Guaranteed positive number. + */ + protected int getDefaultBatchTimeout() { + if (!initialized) {this.init();} + return maxBatchTimeoutAllowedSecs; + } + + private void readMinBatchTimeoutRequested() { + // The knowledge of how to list the currently configured batchTimeouts + // is delegated to the WriterConfiguration for the bolt that called us. + List<Integer> configuredTimeouts = listAllConfiguredTimeouts.get(); + + // Discard non-positive values, which mean "use default" + int minval = Integer.MAX_VALUE; + for (int k : configuredTimeouts) { + if (k < minval && k > 0) minval = k; + } + minBatchTimeoutRequestedSecs = minval; + } + + private void calcRecommendedTickInterval() { + recommendedTickIntervalSecs = Integer.min(minBatchTimeoutRequestedSecs, maxBatchTimeoutAllowedSecs); + //If needed, we can +=1 to assure triggering on each cycle, but this shouldn't be necessary. + //Note that this strategy means that sensors with batchTimeout requested less frequently + //may now have latency of "their requested batchTimeout" + "this recommended tick interval", + //in the worst case. + } + + /** + * @return the recommended TickInterval to request, in seconds + * Guaranteed positive number. + */ + protected int getRecommendedTickInterval() { + if (!initialized) {this.init();} + // Remember that parameter settings in the CLI override parameter settings set by the Storm component. + // We shouldn't have to deal with this in the Metron environment, but just in case, + // warn if our recommended value will be overridden by cliTickTupleFreqSecs. + if (cliTickTupleFreqSecs > 0 && cliTickTupleFreqSecs > recommendedTickIntervalSecs) { + LOG.warn("Parameter '" + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + "' has been forced to value '" + + Integer.toString(cliTickTupleFreqSecs) + "' via CLI configuration. This will override the desired " + + "setting of '" + Integer.toString(recommendedTickIntervalSecs) + + "' and may lead to delayed batch flushing."); + } + if (cliTickTupleFreqSecs > 0 && cliTickTupleFreqSecs < recommendedTickIntervalSecs) { + LOG.info("Parameter '" + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + "' has been forced to value '" + + Integer.toString(cliTickTupleFreqSecs) + "' via CLI configuration. This will override the desired " + + "setting of '" + Integer.toString(recommendedTickIntervalSecs) + + "' and may lead to unexpected periodicity in batch flushing."); + } + return recommendedTickIntervalSecs; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 6e0c371..9b4a6df 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -23,11 +23,13 @@ import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; +import org.apache.metron.common.system.Clock; import org.apache.metron.common.utils.MessageUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.writer.BulkWriterComponent; import org.apache.metron.writer.WriterToBulkWriter; +import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -37,20 +39,27 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; import java.util.function.Function; +import static org.apache.storm.utils.TupleUtils.isTick; + public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { private static final Logger LOG = LoggerFactory .getLogger(BulkMessageWriterBolt.class); private BulkMessageWriter<JSONObject> bulkMessageWriter; - private BulkWriterComponent<JSONObject> writerComponent; + private BulkWriterComponent<JSONObject> writerComponent = null; private String messageGetStrategyType = MessageGetters.DEFAULT_JSON_FROM_FIELD.name(); private String messageGetField; private transient MessageGetStrategy messageGetStrategy; private transient OutputCollector collector; - private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation; + private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation = null; + private int requestedTickFreqSecs; + private int defaultBatchTimeout; + private int batchTimeoutDivisor = 1; + public BulkMessageWriterBolt(String zookeeperUrl) { super(zookeeperUrl); } @@ -75,6 +84,82 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { return this; } + /** + * If this BulkMessageWriterBolt is in a topology where it is daisy-chained with + * other queuing Writers, then the max amount of time it takes for a tuple + * to clear the whole topology is the sum of all the batchTimeouts for all the + * daisy-chained Writers. In the common case where each Writer is using the default + * batchTimeout, it is then necessary to divide that batchTimeout by the number of + * daisy-chained Writers. There are no examples of daisy-chained batching Writers + * in the current Metron topologies, but the feature is available as a "fluent"-style + * mutator if needed. It would be used in the topology set-up files such as + * metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml + * Default value, if not otherwise set, is 1. + * + * If non-default batchTimeouts are configured for some components, the administrator + * may want to take this behavior into account. + * + * @param batchTimeoutDivisor + * @return + */ + public BulkMessageWriterBolt withBatchTimeoutDivisor(int batchTimeoutDivisor) { + if (batchTimeoutDivisor <= 0) { + throw new IllegalArgumentException(String.format("batchTimeoutDivisor must be positive. Value provided was %s", batchTimeoutDivisor)); + } + this.batchTimeoutDivisor = batchTimeoutDivisor; + return this; + } + + /** + * Used only for unit testing + * @param defaultBatchTimeout + */ + protected void setDefaultBatchTimeout(int defaultBatchTimeout) { + this.defaultBatchTimeout = defaultBatchTimeout; + } + + /** + * Used only for unit testing + */ + public int getDefaultBatchTimeout() { + return defaultBatchTimeout; + } + + /** + * This method is called by TopologyBuilder.createTopology() to obtain topology and + * bolt specific configuration parameters. We use it primarily to configure how often + * a tick tuple will be sent to our bolt. + * @return + */ + @Override + public Map<String, Object> getComponentConfiguration() { + // This is called long before prepare(), so do some of the same stuff as prepare() does, + // to get the valid WriterConfiguration. But don't store any non-serializable objects, + // else Storm will throw a runtime error. + Function<WriterConfiguration, WriterConfiguration> configurationXform; + if(bulkMessageWriter instanceof WriterToBulkWriter) { + configurationXform = WriterToBulkWriter.TRANSFORMATION; + } + else { + configurationXform = x -> x; + } + WriterConfiguration writerconf = configurationXform.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + + BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor); + this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval(); + //And while we've got BatchTimeoutHelper handy, capture the defaultBatchTimeout for writerComponent. + this.defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout(); + + Map<String, Object> conf = super.getComponentConfiguration(); + if (conf == null) { + conf = new HashMap<String, Object>(); + } + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, requestedTickFreqSecs); + LOG.info("Requesting " + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + " set to " + Integer.toString(requestedTickFreqSecs)); + return conf; + } + @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.writerComponent = new BulkWriterComponent<>(collector); @@ -92,24 +177,58 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { configurationTransformation = x -> x; } try { - bulkMessageWriter.init(stormConf, - context, - configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), - getConfigurations())) - ); + WriterConfiguration writerconf = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + if (defaultBatchTimeout == 0) { + //This means getComponentConfiguration was never called to initialize defaultBatchTimeout, + //probably because we are in a unit test scenario. So calculate it here. + BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor); + defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout(); + } + writerComponent.setDefaultBatchTimeout(defaultBatchTimeout); + bulkMessageWriter.init(stormConf, context, writerconf); } catch (Exception e) { throw new RuntimeException(e); } } + /** + * Used only for unit testing. + */ + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, Clock clock) { + prepare(stormConf, context, collector); + writerComponent.withClock(clock); + } + @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - JSONObject message = (JSONObject) messageGetStrategy.get(tuple); - String sensorType = MessageUtils.getSensorType(message); + if (isTick(tuple)) { + try { + if (!(bulkMessageWriter instanceof WriterToBulkWriter)) { + //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. + LOG.debug("Flushing message queues older than their batchTimeouts"); + writerComponent.flushTimeouts(bulkMessageWriter, configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) + , messageGetStrategy); + } + } + catch(Exception e) { + throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); + } + finally { + collector.ack(tuple); + } + return; + } + try { - WriterConfiguration writerConfiguration = configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + JSONObject message = (JSONObject) messageGetStrategy.get(tuple); + String sensorType = MessageUtils.getSensorType(message); + LOG.trace("Writing enrichment message: {}", message); + WriterConfiguration writerConfiguration = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); if(writerConfiguration.isDefault(sensorType)) { //want to warn, but not fail the tuple collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType)); @@ -121,7 +240,6 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { , writerConfiguration , messageGetStrategy ); - LOG.trace("Writing enrichment message: {}", message); } catch(Exception e) { throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e); http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java new file mode 100644 index 0000000..6d9b62e --- /dev/null +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.writer.bolt; + +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; + +/** + * + */ +public class BatchTimeoutHelperTest { + private final TimeoutListSupplier defaultConfigList = new TimeoutListSupplier(Arrays.asList()); + private final TimeoutListSupplier disabledConfigList = new TimeoutListSupplier(Arrays.asList(0,0)); + private final TimeoutListSupplier smallTimeoutsList = new TimeoutListSupplier(Arrays.asList(5, 2, 4, 6)); + private final TimeoutListSupplier largeTimeoutsList = new TimeoutListSupplier(Arrays.asList(100, 200, 150, 500)); + private final TimeoutListSupplier illegalTimeoutsList = new TimeoutListSupplier(Arrays.asList(5, 2, -3, 6)); + + @Test + public void testGetDefaultBatchTimeout() throws Exception { + //The defaultBatchTimeout is dependent only on batchTimeoutDivisor and the Storm config + //and CLI overrides, which aren't of interest here. + assertEquals(30, Utils.readStormConfig().getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0)); + BatchTimeoutHelper bth; + bth = new BatchTimeoutHelper(defaultConfigList, 1); + assertEquals(14, bth.getDefaultBatchTimeout()); + bth = new BatchTimeoutHelper(defaultConfigList, 2); + assertEquals(6, bth.getDefaultBatchTimeout()); + bth = new BatchTimeoutHelper(defaultConfigList, 3); + assertEquals(4, bth.getDefaultBatchTimeout()); + bth = new BatchTimeoutHelper(defaultConfigList, 4); + assertEquals(2, bth.getDefaultBatchTimeout()); + bth = new BatchTimeoutHelper(defaultConfigList, 6); + assertEquals(1, bth.getDefaultBatchTimeout()); + bth = new BatchTimeoutHelper(defaultConfigList, 20); + assertEquals(1, bth.getDefaultBatchTimeout()); + + bth = new BatchTimeoutHelper(disabledConfigList, 2); + assertEquals(6, bth.getDefaultBatchTimeout()); + bth = new BatchTimeoutHelper(smallTimeoutsList, 2); + assertEquals(6, bth.getDefaultBatchTimeout()); + } + + @Test + public void testGetRecommendedTickInterval() throws Exception { + //The recommendedTickInterval is the min of defaultBatchTimeout and the configured TimeoutsList. + BatchTimeoutHelper bth; + bth = new BatchTimeoutHelper(defaultConfigList, 2); + assertEquals(6, bth.getRecommendedTickInterval()); + bth = new BatchTimeoutHelper(disabledConfigList, 2); + assertEquals(6, bth.getRecommendedTickInterval()); + bth = new BatchTimeoutHelper(largeTimeoutsList, 2); + assertEquals(6, bth.getRecommendedTickInterval()); + bth = new BatchTimeoutHelper(smallTimeoutsList, 2); + assertEquals(2, bth.getRecommendedTickInterval()); + bth = new BatchTimeoutHelper(illegalTimeoutsList, 2); + assertEquals(2, bth.getRecommendedTickInterval()); + } + + public static class TimeoutListSupplier implements Supplier<List<Integer>> { + private List<Integer> list; + public TimeoutListSupplier(List<Integer> list) { + this.list = list; + } + @Override + public List<Integer> get() { + return list; + } + } + +}