Repository: metron
Updated Branches:
  refs/heads/master 7d554444e -> f072ed231


METRON-322 Global Batching and Flushing (mattf-horton) closes apache/metron#481


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/f072ed23
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/f072ed23
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/f072ed23

Branch: refs/heads/master
Commit: f072ed231f9acdf0957f0244b6402b2ecd9403a0
Parents: 7d55444
Author: mattf-horton <mfo...@hortonworks.com>
Authored: Thu Aug 10 17:38:59 2017 -0700
Committer: mattf <ma...@apache.org>
Committed: Thu Aug 10 17:38:59 2017 -0700

----------------------------------------------------------------------
 .../configuration/IndexingConfigurations.java   |  52 ++++-
 .../writer/IndexingWriterConfiguration.java     |  11 ++
 .../writer/ParserWriterConfiguration.java       |  26 ++-
 .../writer/SingleBatchConfigurationFacade.java  |  13 ++
 .../writer/WriterConfiguration.java             |   3 +
 .../apache/metron/common/system/FakeClock.java  | 162 ++++++++++++++++
 .../writer/IndexingWriterConfigurationTest.java |  30 ++-
 .../bolt/BulkMessageWriterBoltTest.java         | 162 +++++++++++++---
 metron-platform/metron-indexing/README.md       |  19 +-
 .../main/config/zookeeper/indexing/test.json    |   3 +-
 metron-platform/metron-management/README.md     |   5 +-
 .../management/IndexingConfigFunctions.java     |   8 +-
 .../management/IndexingConfigFunctionsTest.java |  12 +-
 .../SimpleHBaseEnrichmentWriterTest.java        |  12 ++
 .../test/bolt/BaseEnrichmentBoltTest.java       |   6 +-
 .../metron/writer/BulkWriterComponent.java      | 192 +++++++++++++++----
 .../metron/writer/bolt/BatchTimeoutHelper.java  | 182 ++++++++++++++++++
 .../writer/bolt/BulkMessageWriterBolt.java      | 140 ++++++++++++--
 .../writer/bolt/BatchTimeoutHelperTest.java     |  92 +++++++++
 19 files changed, 1039 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 9baf8be..1d60084 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -24,11 +24,11 @@ import org.apache.metron.common.utils.JSONUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 public class IndexingConfigurations extends Configurations {
   public static final String BATCH_SIZE_CONF = "batchSize";
+  public static final String BATCH_TIMEOUT_CONF = "batchTimeout";
   public static final String ENABLED_CONF = "enabled";
   public static final String INDEX_CONF = "index";
   public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
@@ -74,7 +74,39 @@ public class IndexingConfigurations extends Configurations {
   }
 
   public int getBatchSize(String sensorName, String writerName ) {
-     return getBatchSize(getSensorIndexingConfig(sensorName, writerName));
+    return getBatchSize(getSensorIndexingConfig(sensorName, writerName));
+  }
+
+  public int getBatchTimeout(String sensorName, String writerName ) {
+    return getBatchTimeout(getSensorIndexingConfig(sensorName, writerName));
+  }
+
+  /**
+   * Returns all configured values of batchTimeout, for all configured sensors,
+   * but only for the specific writer identified by {@param writerName}.  So, 
if it is
+   * an hdfs writer, it will return the batchTimeouts for hdfs writers for all 
the sensors.
+   * The goal is to return to a {@link 
org.apache.metron.common.bolt.ConfiguredBolt}
+   * the set of all and only batchTimeouts relevant to that ConfiguredBolt.
+   *
+   * @param writerName
+   * @return list of integer batchTimeouts, one per configured sensor
+   */
+  public List<Integer> getAllConfiguredTimeouts(String writerName) {
+    // The configuration infrastructure was not designed to enumerate sensors, 
so we synthesize.
+    // Since getKey is in this same class, we know we can pass it a null 
string to get the key prefix
+    // for all sensor types within this capability.  We then enumerate all 
keys in configurations.keySet
+    // and select those that match the key prefix, as being sensor keys.  The 
suffix substring of
+    // each such key is used as a sensor name to query the batchTimeout 
settings, if any.
+    String keyPrefixString = getKey("");
+    int prefixStringLength = keyPrefixString.length();
+    List<Integer> configuredBatchTimeouts = new ArrayList<>();
+    for (String sensorKeyString : configurations.keySet()) {
+      if (sensorKeyString.startsWith(keyPrefixString)) {
+        String configuredSensorName = 
sensorKeyString.substring(prefixStringLength);
+        configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, 
writerName));
+      }
+    }
+    return configuredBatchTimeouts;
   }
 
   public String getIndex(String sensorName, String writerName) {
@@ -105,6 +137,14 @@ public class IndexingConfigurations extends Configurations 
{
                 );
   }
 
+  public static int getBatchTimeout(Map<String, Object> conf) {
+    return getAs( BATCH_TIMEOUT_CONF
+                 ,conf
+                , 0
+                , Integer.class
+                );
+  }
+
   public static String getIndex(Map<String, Object> conf, String sensorName) {
     return getAs( INDEX_CONF
                  ,conf
@@ -132,6 +172,12 @@ public class IndexingConfigurations extends Configurations 
{
     return ret;
   }
 
+  public static Map<String, Object> setBatchTimeout(Map<String, Object> conf, 
int batchTimeout) {
+    Map<String, Object> ret = conf == null?new HashMap<>():conf;
+    ret.put(BATCH_TIMEOUT_CONF, batchTimeout);
+    return ret;
+  }
+
   public static Map<String, Object> setIndex(Map<String, Object> conf, String 
index) {
     Map<String, Object> ret = conf == null?new HashMap<>():conf;
     ret.put(INDEX_CONF, index);

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
index b6b2a4b..beb9373 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.metron.common.configuration.writer;
 
 import org.apache.metron.common.configuration.IndexingConfigurations;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -38,6 +39,16 @@ public class IndexingWriterConfiguration implements 
WriterConfiguration{
   }
 
   @Override
+  public int getBatchTimeout(String sensorName) {
+    return config.orElse(new 
IndexingConfigurations()).getBatchTimeout(sensorName, writerName);
+  }
+
+  @Override
+  public List<Integer> getAllConfiguredTimeouts() {
+      return config.orElse(new 
IndexingConfigurations()).getAllConfiguredTimeouts(writerName);
+  }
+
+  @Override
   public String getIndex(String sensorName) {
     return config.orElse(new IndexingConfigurations()).getIndex(sensorName, 
writerName);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
index 821ef4a..ae74c65 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
@@ -22,6 +22,8 @@ import 
org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class ParserWriterConfiguration implements WriterConfiguration {
@@ -32,9 +34,9 @@ public class ParserWriterConfiguration implements 
WriterConfiguration {
   @Override
   public int getBatchSize(String sensorName) {
     if(config != null
-    && config.getSensorParserConfig(sensorName) != null
-    && config.getSensorParserConfig(sensorName).getParserConfig() != null
-      ) {
+            && config.getSensorParserConfig(sensorName) != null
+            && config.getSensorParserConfig(sensorName).getParserConfig() != 
null
+            ) {
       Object batchObj = 
config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_SIZE_CONF);
       return batchObj == null ? 1 : ConversionUtils.convert(batchObj, 
Integer.class);
     }
@@ -42,6 +44,24 @@ public class ParserWriterConfiguration implements 
WriterConfiguration {
   }
 
   @Override
+  public int getBatchTimeout(String sensorName) {
+    if(config != null
+            && config.getSensorParserConfig(sensorName) != null
+            && config.getSensorParserConfig(sensorName).getParserConfig() != 
null
+            ) {
+      Object batchObj = 
config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_TIMEOUT_CONF);
+      return batchObj == null ? 0 : ConversionUtils.convert(batchObj, 
Integer.class);
+    }
+    return 0;
+  }
+
+  @Override
+  public List<Integer> getAllConfiguredTimeouts() {
+    // TODO - stub implementation pending METRON-750
+    return new ArrayList<Integer>();
+  }
+
+  @Override
   public String getIndex(String sensorName) {
     if(config != null && config.getSensorParserConfig(sensorName) != null
     && config.getSensorParserConfig(sensorName).getParserConfig() != null

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
index 69e5541..e50bd2b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
@@ -18,6 +18,8 @@
 
 package org.apache.metron.common.configuration.writer;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class SingleBatchConfigurationFacade implements WriterConfiguration {
@@ -32,6 +34,17 @@ public class SingleBatchConfigurationFacade implements 
WriterConfiguration {
   }
 
   @Override
+  public int getBatchTimeout(String sensorName) {
+    return 0;
+  }
+
+  @Override
+  public List<Integer> getAllConfiguredTimeouts() {
+    // null implementation since batching is disabled
+    return new ArrayList<Integer>();
+  }
+
+  @Override
   public String getIndex(String sensorName) {
     return config.getIndex(sensorName);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
index 45271e8..2354f95 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
@@ -19,10 +19,13 @@
 package org.apache.metron.common.configuration.writer;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 public interface WriterConfiguration extends Serializable {
   int getBatchSize(String sensorName);
+  int getBatchTimeout(String sensorName);
+  List<Integer> getAllConfiguredTimeouts();
   String getIndex(String sensorName);
   boolean isEnabled(String sensorName);
   Map<String, Object> getSensorConfig(String sensorName);

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java
new file mode 100644
index 0000000..e34a26d
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/system/FakeClock.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.system;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A fake clock for test purposes, that starts out at time zero (epoch), and
+ * never advances itself, but allows you to increment it by any desired amount.
+ *
+ * Note that the base class is not the Java 8 Clock, but rather the Clock we
+ * defined in {@org.apache.metron.common.system.Clock}.  Fundamental units of 
time
+ * are milliseconds.
+ *
+ * Three exceptions are also defined: {@IllegalArgumentClockNegative},
+ * {@IllegalArgumentClockZero}, and {@IllegalArgumentClockOverflow}.
+ * These are thrown in various circumstances that imply the FakeClock is
+ * being used outside of its design intent. They are subclasses of 
IllegalArgumentException,
+ * hence unchecked.
+ */
+public class FakeClock extends Clock {
+  private long now_ms = 0;
+
+  @Override
+  public long currentTimeMillis() {
+    return now_ms;
+  }
+
+  /**
+   * Advance the fake clock by a number of milliseconds.
+   * @param duration_ms
+   *
+   * @throws IllegalArgumentClockNegative (unchecked) if you try to go 
backwards in time.
+   * This is not an allowed behavior, because most system clocks go to great
+   * effort to make sure it never happens, even with, e.g., anomalous events
+   * from a bad NTP server.
+   * If we really get a demand for this capability, we'll add methods that 
don't
+   * check for this.
+   * @throws IllegalArgumentClockOverflow (unchecked) if you try to add a 
duration
+   * that would overflow the Long value of {@currentTimeMillis}
+   */
+  public void elapseMillis(long duration_ms) {
+    long instant_ms = now_ms + duration_ms;
+    if (duration_ms < 0) {
+      throw new IllegalArgumentClockNegative(String.format(
+              "Attempted to move backward in time, by %d milliseconds."
+              , duration_ms));
+    }
+    else if (instant_ms < 0) {
+      throw new IllegalArgumentClockOverflow(String.format(
+              "Attempted to advance beyond the edge of time, to epoch %d + %d."
+              , now_ms, duration_ms));
+    }
+    now_ms = instant_ms;
+  }
+
+  /**
+   * Advance the fake clock by a number of seconds.
+   * See {@elapseMillis} for details.
+   *
+   * @param duration_secs
+   */
+  public void elapseSeconds(long duration_secs) {
+    elapseMillis(TimeUnit.SECONDS.toMillis(duration_secs));
+  }
+
+  /**
+   * Advance the fake clock to a point in time specified as milliseconds after 
0.
+   * @param instant_ms - epoch time in milliseconds
+   *
+   * @throws IllegalArgumentClockNegative (unchecked) if you try to go 
backwards in time.
+   * This is not an allowed behavior, because most system clocks go to great
+   * effort to make sure it never happens, even with, e.g., anomalous events
+   * from a bad NTP server.
+   * If we really get a demand for this capability, we'll add methods that 
don't
+   * check for this.
+   * @throws IllegalArgumentClockZero (unchecked) if you try to "advance" the 
clock to the time it already is.
+   * Why?  Because it implies your test code has lost track of previous 
increments,
+   * which might be problematic, so we do this in the spirit of "fail fast".
+   * If you *meant* to lose track, for instance if you were using random 
numbers of events,
+   * or whatever, you can always orient yourself in time by reading 
{@currentTimeMillis}.
+   */
+  public void advanceToMillis(long instant_ms) {
+    if (instant_ms < now_ms) {
+      throw new IllegalArgumentClockNegative(String.format(
+              "Attempted to move backward in time, from epoch %d to %d."
+              , now_ms, instant_ms));
+    }
+    if (instant_ms == now_ms) {
+      throw new IllegalArgumentClockZero(String.format(
+              "Time was set to current time, with null advance, at epoch %d."
+              , now_ms));
+    }
+    now_ms = instant_ms;
+  }
+
+  /**
+   * Advance the fake clock to a point in time specified as seconds after 0.
+   * See {@advanceToMillis} for details.
+   *
+   * @param instant_secs - epoch time in seconds
+   */
+  public void advanceToSeconds(long instant_secs) {
+    advanceToMillis(TimeUnit.SECONDS.toMillis(instant_secs));
+  }
+
+  /**
+   * IllegalArgumentClockNegative (unchecked) is thrown if you try to go 
backwards in time.
+   * This is not an allowed behavior, because most system clocks go to great
+   * effort to make sure it never happens, even with, e.g., anomalous events
+   * from a bad NTP server.
+   * If we really get a demand for this capability, we'll add methods that 
don't
+   * check for this.
+   */
+  public static class IllegalArgumentClockNegative extends 
IllegalArgumentException {
+    public IllegalArgumentClockNegative(String s) {
+      super(s);
+    }
+  }
+
+  /**
+   * IllegalArgumentClockZero (unchecked) is thrown if you try to "advance" 
the clock to the time it already is.
+   * Why?  Because it implies your test code has lost track of previous 
increments,
+   * which might be problematic, so we do this in the spirit of "fail fast".
+   * If you *meant* to lose track, for instance if you were using random 
numbers of events,
+   * or whatever, you can always orient yourself in time by reading 
{@currentTimeMillis}.
+   *
+   * Note that argument does not apply to ellapseMillis(0), so it does not 
throw
+   * this exception.
+   */
+  public static class IllegalArgumentClockZero extends 
IllegalArgumentException {
+    public IllegalArgumentClockZero(String s) {
+      super(s);
+    }
+  }
+
+  /**
+   * IllegalArgumentClockOverflow (unchecked) is thrown if you try to add a 
duration
+   * that would overflow the Long value of {@currentTimeMillis}
+   */
+  public static class IllegalArgumentClockOverflow extends 
IllegalArgumentException {
+    public IllegalArgumentClockOverflow(String s) {
+      super(s);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
index 94da965..aec215f 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
@@ -18,12 +18,18 @@
 
 package org.apache.metron.common.writer;
 
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static 
org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath;
+import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sensorType;
+
 public class IndexingWriterConfigurationTest {
   @Test
   public void testDefaultBatchSize() {
@@ -33,6 +39,28 @@ public class IndexingWriterConfigurationTest {
     Assert.assertEquals(1, config.getBatchSize("foo"));
   }
   @Test
+  public void testDefaultBatchTimeout() {
+    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
+           new IndexingConfigurations()
+    );
+    Assert.assertEquals(0, config.getBatchTimeout("foo"));
+  }
+  @Test
+  public void testGetAllConfiguredTimeouts() throws FileNotFoundException, 
IOException {
+    //default
+    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
+            new IndexingConfigurations()
+    );
+    Assert.assertEquals(0, config.getAllConfiguredTimeouts().size());
+    //non-default
+    IndexingConfigurations iconfigs = new IndexingConfigurations();
+    iconfigs.updateSensorIndexingConfig(
+            sensorType, new FileInputStream(sampleSensorIndexingConfigPath));
+    config = new IndexingWriterConfiguration("elasticsearch", iconfigs);
+    Assert.assertEquals(1, config.getAllConfiguredTimeouts().size());
+    Assert.assertEquals(7, (long)config.getAllConfiguredTimeouts().get(0));
+  }
+  @Test
   public void testDefaultIndex() {
     IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
            new IndexingConfigurations()

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 79d8285..5f6f22f 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -17,21 +17,22 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.log4j.Level;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.system.FakeClock;
+import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.metron.writer.bolt.BulkMessageWriterBolt;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.writer.bolt.BulkMessageWriterBolt;
 import org.hamcrest.Description;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -48,15 +49,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
 
@@ -93,23 +91,24 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
 
   private JSONObject sampleMessage;
   private List<JSONObject> messageList;
+  private List<JSONObject> fullMessageList;
   private List<Tuple> tupleList;
 
   @Before
   public void parseMessages() throws ParseException {
     JSONParser parser = new JSONParser();
+    fullMessageList = new ArrayList<>();
     sampleMessage = (JSONObject) parser.parse(sampleMessageString);
     sampleMessage.put("field", "value1");
-    messageList = new ArrayList<>();
-    messageList.add(((JSONObject) sampleMessage.clone()));
+    fullMessageList.add(((JSONObject) sampleMessage.clone()));
     sampleMessage.put("field", "value2");
-    messageList.add(((JSONObject) sampleMessage.clone()));
+    fullMessageList.add(((JSONObject) sampleMessage.clone()));
     sampleMessage.put("field", "value3");
-    messageList.add(((JSONObject) sampleMessage.clone()));
+    fullMessageList.add(((JSONObject) sampleMessage.clone()));
     sampleMessage.put("field", "value4");
-    messageList.add(((JSONObject) sampleMessage.clone()));
+    fullMessageList.add(((JSONObject) sampleMessage.clone()));
     sampleMessage.put("field", "value5");
-    messageList.add(((JSONObject) sampleMessage.clone()));
+    fullMessageList.add(((JSONObject) sampleMessage.clone()));
   }
 
   @Mock
@@ -119,14 +118,17 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
   private MessageGetStrategy messageGetStrategy;
 
   @Test
-  public void test() throws Exception {
+  public void testFlushOnBatchSize() throws Exception {
     BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
-            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()).withMessageGetterField("message");
+            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+            .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setTreeCache(cache);
-    
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
 new FileInputStream(sampleSensorIndexingConfigPath));
+    
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
+            new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("error"), argThat(new 
FieldsMatcher("message")));
+    verify(declarer, times(1)).declareStream(eq("error"), argThat(
+            new FieldsMatcher("message")));
     Map stormConf = new HashMap();
     doThrow(new 
Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class),
 any(WriterConfiguration.class));
     try {
@@ -138,23 +140,31 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
     verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class), 
any(WriterConfiguration.class));
     tupleList = new ArrayList<>();
+    messageList = new ArrayList<>();
     for(int i = 0; i < 4; i++) {
-      when(tuple.getValueByField("message")).thenReturn(messageList.get(i));
+      
when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i));
       tupleList.add(tuple);
+      messageList.add(fullMessageList.get(i));
       bulkMessageWriterBolt.execute(tuple);
-      verify(bulkMessageWriter, times(0)).write(eq(sensorType), 
any(WriterConfiguration.class), eq(tupleList), eq(messageList));
+      verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+              , any(WriterConfiguration.class), eq(tupleList), 
eq(messageList));
     }
-    when(tuple.getValueByField("message")).thenReturn(messageList.get(4));
+    when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(4));
     tupleList.add(tuple);
+    messageList.add(fullMessageList.get(4));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllSuccesses(tupleList);
-    when(bulkMessageWriter.write(eq(sensorType), 
any(WriterConfiguration.class), eq(tupleList), argThat(new 
MessageListMatcher(messageList)))).thenReturn(response);
+    when(bulkMessageWriter.write(eq(sensorType), 
any(WriterConfiguration.class), eq(tupleList)
+            , argThat(new 
MessageListMatcher(messageList)))).thenReturn(response);
     bulkMessageWriterBolt.execute(tuple);
-    verify(bulkMessageWriter, times(1)).write(eq(sensorType), 
any(WriterConfiguration.class), eq(tupleList), argThat(new 
MessageListMatcher(messageList)));
+    verify(bulkMessageWriter, times(1)).write(eq(sensorType)
+            , any(WriterConfiguration.class), eq(tupleList)
+            , argThat(new MessageListMatcher(messageList)));
     verify(outputCollector, times(5)).ack(tuple);
     reset(outputCollector);
-    doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), 
any(WriterConfiguration.class), Matchers.anyListOf(Tuple.class), 
Matchers.anyListOf(JSONObject.class));
-    when(tuple.getValueByField("message")).thenReturn(messageList.get(0));
+    doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), 
any(WriterConfiguration.class)
+            , Matchers.anyListOf(Tuple.class), 
Matchers.anyListOf(JSONObject.class));
+    when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(0));
     UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.FATAL);
     for(int i = 0; i < 5; i++) {
       bulkMessageWriterBolt.execute(tuple);
@@ -164,4 +174,100 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), 
any(Values.class));
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
+
+  @Test
+  public void testFlushOnBatchTimeout() throws Exception {
+    FakeClock clock = new FakeClock();
+    BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
+            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+            .withMessageGetterField("message").withBatchTimeoutDivisor(3);
+    bulkMessageWriterBolt.setCuratorFramework(client);
+    bulkMessageWriterBolt.setTreeCache(cache);
+    
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
+            new FileInputStream(sampleSensorIndexingConfigPath));
+    bulkMessageWriterBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("error")
+            , argThat(new FieldsMatcher("message")));
+    Map stormConf = new HashMap();
+    when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
+    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, 
clock);
+    verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class), 
any(WriterConfiguration.class));
+    int batchTimeout = bulkMessageWriterBolt.getDefaultBatchTimeout();
+    assertEquals(4, batchTimeout);
+    tupleList = new ArrayList<>();
+    messageList = new ArrayList<>();
+    for(int i = 0; i < 3; i++) {
+      
when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i));
+      tupleList.add(tuple);
+      messageList.add(fullMessageList.get(i));
+      bulkMessageWriterBolt.execute(tuple);
+      verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+              , any(WriterConfiguration.class), eq(tupleList), 
eq(messageList));
+    }
+    clock.elapseSeconds(5);
+    when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(3));
+    tupleList.add(tuple);
+    messageList.add(fullMessageList.get(3));
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllSuccesses(tupleList);
+    when(bulkMessageWriter.write(eq(sensorType), 
any(WriterConfiguration.class), eq(tupleList)
+            , argThat(new 
MessageListMatcher(messageList)))).thenReturn(response);
+    bulkMessageWriterBolt.execute(tuple);
+    verify(bulkMessageWriter, times(1)).write(eq(sensorType)
+            , any(WriterConfiguration.class)
+            , eq(tupleList), argThat(new MessageListMatcher(messageList)));
+    verify(outputCollector, times(4)).ack(tuple);
+  }
+
+  @Test
+  public void testFlushOnTickTuple() throws Exception {
+    FakeClock clock = new FakeClock();
+    BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
+            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+            .withMessageGetterField("message");
+    bulkMessageWriterBolt.setCuratorFramework(client);
+    bulkMessageWriterBolt.setTreeCache(cache);
+    
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType
+            , new FileInputStream(sampleSensorIndexingConfigPath));
+    bulkMessageWriterBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("error")
+            , argThat(new FieldsMatcher("message")));
+    Map stormConf = new HashMap();
+    when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
+    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, 
clock);
+    verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class)
+            , any(WriterConfiguration.class));
+    int batchTimeout = bulkMessageWriterBolt.getDefaultBatchTimeout();
+    assertEquals(14, batchTimeout);
+    tupleList = new ArrayList<>();
+    messageList = new ArrayList<>();
+    for(int i = 0; i < 3; i++) {
+      
when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i));
+      tupleList.add(tuple);
+      messageList.add(fullMessageList.get(i));
+      bulkMessageWriterBolt.execute(tuple);
+      verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+              , any(WriterConfiguration.class), eq(tupleList), 
eq(messageList));
+    }
+    when(tuple.getValueByField("message")).thenReturn(null);
+    when(tuple.getSourceComponent()).thenReturn("__system"); //mark the tuple 
as a TickTuple, part 1 of 2
+    when(tuple.getSourceStreamId()).thenReturn("__tick");    //mark the tuple 
as a TickTuple, part 2 of 2
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllSuccesses(tupleList);
+    when(bulkMessageWriter.write(eq(sensorType), 
any(WriterConfiguration.class), eq(tupleList)
+            , argThat(new 
MessageListMatcher(messageList)))).thenReturn(response);
+    clock.advanceToSeconds(2);
+    bulkMessageWriterBolt.execute(tuple);
+    verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+            , any(WriterConfiguration.class)
+            , eq(tupleList), argThat(new MessageListMatcher(messageList)));
+    verify(outputCollector, times(1)).ack(tuple);  // 1 tick
+    clock.advanceToSeconds(9);
+    bulkMessageWriterBolt.execute(tuple);
+    verify(bulkMessageWriter, times(1)).write(eq(sensorType)
+            , any(WriterConfiguration.class)
+            , eq(tupleList), argThat(new MessageListMatcher(messageList)));
+    assertEquals(3, tupleList.size());
+    verify(outputCollector, times(5)).ack(tuple);  // 3 messages + 2nd tick
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md 
b/metron-platform/metron-indexing/README.md
index 2095d0f..aea670c 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -11,9 +11,9 @@ from the enrichment topology that have been enriched and 
storing the data in one
 By default, this topology writes out to both HDFS and one of
 Elasticsearch and Solr.
 
-Indices are written in batch and the batch size is specified in the
-[Sensor Indexing Configuration](#sensor-indexing-configuration) via the 
`batchSize` parameter.
-This config is variable by sensor type.
+Indices are written in batch and the batch size and batch timeout are 
specified in the
+[Sensor Indexing Configuration](#sensor-indexing-configuration) via the 
`batchSize` and `batchTimeout` parameters.
+These configs are variable by sensor type.
 
 ## Indexing Architecture
 
@@ -40,7 +40,10 @@ elasticsearch or solr and hdfs writers running.
 
 The configuration for an individual writer-specific configuration is a JSON 
map with the following fields:
 * `index` : The name of the index to write to (defaulted to the name of the 
sensor).
-* `batchSize` : The size of the batch that is written to the indices at once 
(defaulted to `1`).
+* `batchSize` : The size of the batch that is written to the indices at once. 
Defaults to `1` (no batching).
+* `batchTimeout` : The timeout after which a batch will be flushed even if 
batchSize has not been met.  Optional.
+If unspecified, or set to `0`, it defaults to a system-determined duration 
which is a fraction of the Storm 
+parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since 
this disables batching.
 * `enabled` : Whether the writer is enabled (default `true`).
 
 ### Indexing Configuration Examples
@@ -56,10 +59,12 @@ or no file at all.
 * elasticsearch writer
   * enabled
   * batch size of 1
+  * batch timeout system default
   * index name the same as the sensor
 * hdfs writer
   * enabled
   * batch size of 1
+  * batch timeout system default
   * index name the same as the sensor
 
 If a writer config is unspecified, then a warning is indicated in the
@@ -72,11 +77,13 @@ Storm console.  e.g.:
    "elasticsearch": {
       "index": "foo",
       "batchSize" : 100,
+      "batchTimeout" : 0,
       "enabled" : true 
     },
    "hdfs": {
       "index": "foo",
       "batchSize": 1,
+      "batchTimeout" : 0,
       "enabled" : true
     }
 }
@@ -84,10 +91,12 @@ Storm console.  e.g.:
 * elasticsearch writer
   * enabled
   * batch size of 100
+  * batch timeout system default
   * index name of "foo"
 * hdfs writer
   * enabled
   * batch size of 1
+  * batch timeout system default
   * index name of "foo"
 
 #### HDFS Writer turned off
@@ -100,6 +109,7 @@ Storm console.  e.g.:
    "hdfs": {
       "index": "foo",
       "batchSize": 100,
+      "batchTimeout" : 0,
       "enabled" : false
     }
 }
@@ -107,6 +117,7 @@ Storm console.  e.g.:
 * elasticsearch writer
   * enabled
   * batch size of 1
+  * batch timeout system default
   * index name of "foo"
 * hdfs writer
   * disabled

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
 
b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
index 0197f0c..17ed56c 100644
--- 
a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
+++ 
b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
@@ -6,7 +6,8 @@
   },
   "elasticsearch" : {
     "index": "yaf",
-    "batchSize": 5,
+    "batchSize": 25,
+    "batchTimeout": 7,
     "enabled" : true
   },
   "solr" : {

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/README.md 
b/metron-platform/metron-management/README.md
index d256885..07c6908 100644
--- a/metron-platform/metron-management/README.md
+++ b/metron-platform/metron-management/README.md
@@ -23,7 +23,7 @@ The functions are split roughly into a few sections:
 * File functions - Functions around interacting with local or HDFS files
 * Configuration functions - Functions surrounding pulling and pushing configs 
from zookeeper
 * Parser functions - Functions surrounding adding, viewing, and removing 
Parser functions.
-* Enrichment functions - Functions surrounding adding, viewing and removing 
Stellar enrichments as well as managing batch size and index names for the 
enrichment topology configuration
+* Enrichment functions - Functions surrounding adding, viewing and removing 
Stellar enrichments as well as managing batch size, batch timeout, and index 
names for the enrichment topology configuration
 * Threat Triage functions - Functions surrounding adding, viewing and removing 
threat triage functions.
 
 ### Grok Functions
@@ -169,11 +169,12 @@ The functions are split roughly into a few sections:
 ### Indexing Functions
 
 * `INDEXING_SET_BATCH`
-  * Description: Set batch size
+  * Description: Set batch size and timeout
   * Input:
     * sensorConfig - Sensor config to add transformation to.
     * writer - The writer to update (e.g. elasticsearch, solr or hdfs)
     * size - batch size (integer), defaults to 1, meaning batching disabled
+    * timeout - (optional) batch timeout in seconds (integer), defaults to 0, 
meaning system default
   * Returns: The String representation of the config in zookeeper
 * `INDEXING_SET_ENABLED`
   * Description: Enable or disable an indexing writer for a sensor.

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
index 128daea..235044f 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
@@ -40,10 +40,11 @@ public class IndexingConfigFunctions {
   @Stellar(
            namespace = "INDEXING"
           ,name = "SET_BATCH"
-          ,description = "Set batch size"
+          ,description = "Set batch size and timeout"
           ,params = {"sensorConfig - Sensor config to add transformation to."
                     ,"writer - The writer to update (e.g. elasticsearch, solr 
or hdfs)"
                     ,"size - batch size (integer), defaults to 1, meaning 
batching disabled"
+                    ,"timeout - (optional) batch timeout in seconds (integer), 
defaults to 0, meaning system default"
                     }
           ,returns = "The String representation of the config in zookeeper"
           )
@@ -78,6 +79,11 @@ public class IndexingConfigFunctions {
         }
       }
       configObj.put(writer, IndexingConfigurations.setBatchSize((Map<String, 
Object>) configObj.get(writer), batchSize));
+      int batchTimeout = 0;
+      if(args.size() > 3) {
+        batchTimeout = ConversionUtils.convert(args.get(i++), Integer.class);
+      }
+      configObj.put(writer, 
IndexingConfigurations.setBatchTimeout((Map<String, Object>) 
configObj.get(writer), batchTimeout));
       try {
         return JSONUtils.INSTANCE.toJSON(configObj, true);
       } catch (JsonProcessingException e) {

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
index bfed827..29c80b8 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
@@ -62,7 +62,17 @@ public class IndexingConfigFunctionsTest {
                              , toMap("config", "{}")
     );
     Map<String, Object> config = (Map<String, 
Object>)INDEXING.deserialize(out);
-    Assert.assertEquals(IndexingConfigurations.getBatchSize((Map<String, 
Object>) config.get("hdfs")), 10);
+    Assert.assertEquals(10, IndexingConfigurations.getBatchSize((Map<String, 
Object>) config.get("hdfs")));
+  }
+
+  @Test
+  public void testSetBatchWithTimeout() {
+    String out = (String) run("INDEXING_SET_BATCH(config, 'hdfs', 10, 2)"
+                             , toMap("config", "{}")
+    );
+    Map<String, Object> config = (Map<String, 
Object>)INDEXING.deserialize(out);
+    Assert.assertEquals(10, IndexingConfigurations.getBatchSize((Map<String, 
Object>) config.get("hdfs")));
+    Assert.assertEquals(2,  
IndexingConfigurations.getBatchTimeout((Map<String, Object>) 
config.get("hdfs")));
   }
 
   @Test(expected=IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index 5f4b3fd..15c84da 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -159,6 +159,18 @@ public class SimpleHBaseEnrichmentWriterTest {
       }
 
       @Override
+      public int getBatchTimeout(String sensorName) {
+        //TODO - enable unit testing
+        return 0;
+      }
+
+      @Override
+      public List<Integer> getAllConfiguredTimeouts() {
+        //TODO - enable unit testing
+        return new ArrayList<>();
+      }
+
+      @Override
       public String getIndex(String sensorName) {
         return SENSOR_TYPE;
       }

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
 
b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
index 8e351ff..f270d97 100644
--- 
a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
+++ 
b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
@@ -29,12 +29,12 @@ import java.util.Set;
 
 public class BaseEnrichmentBoltTest extends BaseBoltTest {
 
-  public String sampleSensorEnrichmentConfigPath = 
TestConstants.SAMPLE_CONFIG_PATH + "enrichments/test.json";
-  public String sampleSensorIndexingConfigPath = 
TestConstants.SAMPLE_CONFIG_PATH + "indexing/test.json";
+  public static final String sampleSensorEnrichmentConfigPath = 
TestConstants.SAMPLE_CONFIG_PATH + "enrichments/test.json";
+  public static final String sampleSensorIndexingConfigPath = 
TestConstants.SAMPLE_CONFIG_PATH + "indexing/test.json";
   protected Set<String> streamIds = new HashSet<>();
   protected Set<String> joinStreamIds = new HashSet<>();
   protected String key = "someKey";
-  protected String sensorType = "test";
+  public static final String sensorType = "test";
 
   /**
    * {

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index df6ee90..37c624f 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -23,6 +23,7 @@ import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
@@ -38,15 +39,51 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * This component implements message batching, with both flush on queue size, 
and flush on queue timeout.
+ * There is a queue for each sensorType.
+ * Ideally each queue would have its own timer, but we only have one global 
timer, the Tick Tuple
+ * generated at fixed intervals by the system and received by the Bolt.  Given 
this constraint,
+ * we use the following strategy:
+ *   - The default batchTimeout is, as recommended by Storm, 1/2 the Storm 
'topology.message.timeout.secs',
+ *   modified by batchTimeoutDivisor, in case multiple batching writers are 
daisy-chained in one topology.
+ *   - If some sensors configure their own batchTimeouts, they are compared 
with the default.  Batch
+ *   timeouts greater than the default will be ignored, because they can cause 
message recycling in Storm.
+ *   Batch timeouts configured to <= zero, or undefined, mean use the default.
+ *   - The *smallest* configured batchTimeout among all sensor types, greater 
than zero and less than
+ *   the default, will be used to configure the 
'topology.tick.tuple.freq.secs' for the Bolt.  If there are no
+ *   valid configured batchTimeouts, the defaultBatchTimeout will be used.
+ *   - The age of the queue is checked every time a sensor message arrives.  
Thus, if at least one message
+ *   per second is received for a given sensor, that queue will flush on 
timeout or sooner, depending on batchSize.
+ *   - On each Tick Tuple received, *all* queues will be checked, and if any 
are older than their respective
+ *   batchTimeout, they will be flushed.  Note that this does NOT guarantee 
timely flushing, depending on the
+ *   phase relationship between the queue's batchTimeout and the tick 
interval.  The maximum age of a queue
+ *   before it flushes is its batchTimeout + the tick interval, which is 
guaranteed to be less than 2x the
+ *   batchTimeout, and also less than the 'topology.message.timeout.secs'.  
This guarantees that the messages
+ *   will not age out of the Storm topology, but it does not guarantee the 
flush interval requested, for
+ *   sensor types not receiving at least one message every second.
+ *
+ * @param <MESSAGE_T>
+ */
 public class BulkWriterComponent<MESSAGE_T> {
   public static final Logger LOG = LoggerFactory
             .getLogger(BulkWriterComponent.class);
   private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
   private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
+  private Map<String, long[]> batchTimeoutMap = new HashMap<>();
   private OutputCollector collector;
+  //In test scenarios, defaultBatchTimeout may not be correctly initialized, 
so do it here.
+  //This is a conservative defaultBatchTimeout for a vanilla bolt with 
batchTimeoutDivisor=2
+  public static final int UNINITIALIZED_DEFAULT_BATCH_TIMEOUT = 6;
+  private int defaultBatchTimeout = UNINITIALIZED_DEFAULT_BATCH_TIMEOUT;
   private boolean handleCommit = true;
   private boolean handleError = true;
+  private static final int LAST_CREATE_TIME_MS = 0; //index zero'th element of 
long[] in batchTimeoutMap
+  private static final int TIMEOUT_MS = 1;          //index next element of 
long[] in batchTimeoutMap
+  private Clock clock = new Clock();
+
   public BulkWriterComponent(OutputCollector collector) {
     this.collector = collector;
   }
@@ -57,6 +94,15 @@ public class BulkWriterComponent<MESSAGE_T> {
     this.handleError = handleError;
   }
 
+  /**
+   * Used only for testing.  Overrides the default (actual) wall clock.
+   * @return this mutated BulkWriterComponent
+   */
+   public BulkWriterComponent withClock(Clock clock) {
+    this.clock = clock;
+    return this;
+  }
+
   public void commit(Iterable<Tuple> tuples) {
     tuples.forEach(t -> collector.ack(t));
     if(LOG.isDebugEnabled()) {
@@ -92,7 +138,6 @@ public class BulkWriterComponent<MESSAGE_T> {
     return new ArrayList<>();
   }
 
-
   public void errorAll(Throwable e, MessageGetStrategy messageGetStrategy) {
     for(String key : new HashSet<>(sensorTupleMap.keySet())) {
       errorAll(key, e, messageGetStrategy);
@@ -114,55 +159,136 @@ public class BulkWriterComponent<MESSAGE_T> {
                    , MessageGetStrategy messageGetStrategy
                    ) throws Exception
   {
-    if(!configurations.isEnabled(sensorType)) {
+    if (!configurations.isEnabled(sensorType)) {
       collector.ack(tuple);
       return;
     }
     int batchSize = configurations.getBatchSize(sensorType);
+
+    if (batchSize <= 1) { //simple case - no batching, no timeouts
+      Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);  //still 
read in case batchSize changed
+      if (tupleList == null) {
+        tupleList = createTupleCollection();
+      }
+      tupleList.add(tuple);
+
+      List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);  //still 
read in case batchSize changed
+      if (messageList == null) {
+        messageList = new ArrayList<>();
+      }
+      messageList.add(message);
+
+      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, 
tupleList, messageList);
+      return;
+    }
+
+    //Otherwise do the full batch buffering with timeouts
+    long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType);
+    if (batchTimeoutInfo == null) {
+      //lazily create the batchTimeoutInfo array, once per sensor.
+      batchTimeoutInfo = new long[] {0L, 0L};
+      batchTimeoutMap.put(sensorType, batchTimeoutInfo);
+    }
+
     Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
     if (tupleList == null) {
+      //This block executes at the beginning of every batch, per sensor.
       tupleList = createTupleCollection();
+      sensorTupleMap.put(sensorType, tupleList);
+      batchTimeoutInfo[LAST_CREATE_TIME_MS] = clock.currentTimeMillis();
+      //configurations can change, so (re)init getBatchTimeout(sensorType) at 
start of every batch
+      int batchTimeoutSecs = configurations.getBatchTimeout(sensorType);
+      if (batchTimeoutSecs <= 0 || batchTimeoutSecs > defaultBatchTimeout) {
+        batchTimeoutSecs = defaultBatchTimeout;
+      }
+      batchTimeoutInfo[TIMEOUT_MS] = 
TimeUnit.SECONDS.toMillis(batchTimeoutSecs);
     }
     tupleList.add(tuple);
+
     List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
     if (messageList == null) {
       messageList = new ArrayList<>();
+      sensorMessageMap.put(sensorType, messageList);
     }
     messageList.add(message);
 
-    if (tupleList.size() < batchSize) {
-      sensorTupleMap.put(sensorType, tupleList);
-      sensorMessageMap.put(sensorType, messageList);
-    } else {
-      long startTime = System.nanoTime();
-      try {
-        BulkWriterResponse response = bulkMessageWriter.write(sensorType, 
configurations, tupleList, messageList);
-
-        // Commit or error piecemeal.
-        if(handleCommit) {
-          commit(response);
-        }
-
-        if(handleError) {
-          error(sensorType, response, messageGetStrategy);
-        } else if (response.hasErrors()) {
-          throw new IllegalStateException("Unhandled bulk errors in response: 
" + response.getErrors());
-        }
-      } catch (Throwable e) {
-        if(handleError) {
-          error(sensorType, e, tupleList, messageGetStrategy);
-        }
-        else {
-          throw e;
-        }
+    //Check for batchSize flush
+    if (tupleList.size() >= batchSize) {
+      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, 
tupleList, messageList);
+      return;
+    }
+    //Check for batchTimeout flush (if the tupleList isn't brand new).
+    //Debugging note: If your queue always flushes at length==2 regardless of 
feed rate,
+    //it may mean defaultBatchTimeout has somehow been set to zero.
+    if (tupleList.size() > 1 && (clock.currentTimeMillis() - 
batchTimeoutInfo[LAST_CREATE_TIME_MS] >= batchTimeoutInfo[TIMEOUT_MS])) {
+      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, 
tupleList, messageList);
+      return;
+    }
+  }
+
+  private void flush( String sensorType
+                    , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+                    , WriterConfiguration configurations
+                               , MessageGetStrategy messageGetStrategy
+                    , Collection<Tuple> tupleList
+                    , List<MESSAGE_T> messageList
+                    ) throws Exception
+  {
+    long startTime = System.currentTimeMillis(); //no need to mock, so use 
real time
+    try {
+      BulkWriterResponse response = bulkMessageWriter.write(sensorType, 
configurations, tupleList, messageList);
+
+      // Commit or error piecemeal.
+      if(handleCommit) {
+        commit(response);
       }
-      finally {
-        sensorTupleMap.remove(sensorType);
-        sensorMessageMap.remove(sensorType);
+
+      if(handleError) {
+        error(sensorType, response, messageGetStrategy);
+      } else if (response.hasErrors()) {
+        throw new IllegalStateException("Unhandled bulk errors in response: " 
+ response.getErrors());
+      }
+    } catch (Throwable e) {
+      if(handleError) {
+        error(sensorType, e, tupleList, messageGetStrategy);
+      }
+      else {
+        throw e;
+      }
+    }
+    finally {
+      sensorTupleMap.remove(sensorType);
+      sensorMessageMap.remove(sensorType);
+    }
+    long endTime = System.currentTimeMillis();
+    long elapsed = endTime - startTime;
+    LOG.debug("Bulk batch for sensor {} completed in ~{} ns", sensorType, 
elapsed);
+  }
+
+  // Flushes all queues older than their batchTimeouts.
+  public void flushTimeouts(
+            BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+          , WriterConfiguration configurations
+          , MessageGetStrategy messageGetStrategy
+          ) throws Exception
+  {
+    // No need to do "all" sensorTypes here, just the ones that have data 
batched up.
+    // Note queues with batchSize == 1 don't get batched, so they never 
persist in the sensorTupleMap.
+    for (String sensorType : sensorTupleMap.keySet()) {
+      long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType);
+      if (batchTimeoutInfo == null  //Shouldn't happen, but conservatively 
flush if so
+          || clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS] 
>= batchTimeoutInfo[TIMEOUT_MS]) {
+        flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy
+                   , sensorTupleMap.get(sensorType), 
sensorMessageMap.get(sensorType));
+        return;
       }
-      long endTime = System.nanoTime();
-      long elapsed = endTime - startTime;
-      LOG.debug("Bulk batch completed in ~{} ns", elapsed);
     }
   }
+
+  /**
+   * @param defaultBatchTimeout
+   */
+  public void setDefaultBatchTimeout(int defaultBatchTimeout) {
+    this.defaultBatchTimeout = defaultBatchTimeout;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
new file mode 100644
index 0000000..22440ad
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Routines to help figure out the effective batchTimeout(s), using 
information from
+ * multiple configuration sources, topology.message.timeout.secs, and 
batchTimeoutDivisor,
+ * and use it to calculate defaultBatchTimeout and appropriate 
topology.tick.tuple.freq.secs.
+ *
+ * These methods cause no side effects outside of setting the internal member 
variables.
+ * "base" config are from defaults and storm.yaml (subordinate to Component 
config settings)
+ * "cli" config are from CLI arguments (superior to Component config settings)
+ * 0 or Integer.MAX_VALUE means disabled, except in batchTimeout values, where
+ * 0 means use the default.
+ *
+ * These lookups are fairly expensive, and changing the result values 
currently require
+ * a restart of the Storm topology anyway, so this implementation caches its 
results for
+ * re-reading.  If you want different results, you'll need a new instance or a 
restart.
+ */
+public class BatchTimeoutHelper {
+
+  private static final Logger LOG = LoggerFactory
+          .getLogger(BulkMessageWriterBolt.class);
+  private boolean initialized = false;
+  private Supplier<List<Integer>> listAllConfiguredTimeouts;
+  protected int batchTimeoutDivisor;
+  protected int baseMessageTimeoutSecs;
+  protected int cliMessageTimeoutSecs;
+  protected int baseTickTupleFreqSecs;
+  protected int cliTickTupleFreqSecs;
+  protected int effectiveMessageTimeoutSecs;
+  protected int maxBatchTimeoutAllowedSecs; //derived from MessageTimeoutSecs 
value
+  protected int minBatchTimeoutRequestedSecs; //min of all sensorType 
configured batchTimeout requests
+  protected int recommendedTickIntervalSecs;  //the answer
+
+  BatchTimeoutHelper( Supplier<List<Integer>> listAllConfiguredTimeouts
+          , int batchTimeoutDivisor
+          )
+  {
+    // The two arguments to the constructor are information only available at 
the Bolt object (batchTimeoutDivisor)
+    // and WriterConfiguration object (listAllConfiguredTimeouts).
+    this.batchTimeoutDivisor = batchTimeoutDivisor;
+    this.listAllConfiguredTimeouts = listAllConfiguredTimeouts;
+    // Reads and calculations are deferred until first call, then frozen for 
the duration of this BatchTimeoutHelper instance.
+  }
+
+  private synchronized void init() {
+    if (initialized) return;
+    readGlobalTimeoutConfigs();
+    calcMaxBatchTimeoutAllowed();
+    readMinBatchTimeoutRequested();
+    calcRecommendedTickInterval();
+    initialized = true;
+  }
+
+  //modified from Utils.readStormConfig()
+  private Map readStormConfigWithoutCLI() {
+    Map ret = Utils.readDefaultConfig();
+    String confFile = System.getProperty("storm.conf.file");
+    Map storm;
+    if (confFile == null || confFile.equals("")) {
+      storm = Utils.findAndReadConfigFile("storm.yaml", false);
+    } else {
+      storm = Utils.findAndReadConfigFile(confFile, true);
+    }
+    ret.putAll(storm);
+    return ret;
+  }
+
+  private void readGlobalTimeoutConfigs() {
+    Map stormConf = readStormConfigWithoutCLI();
+    Map cliConf   = Utils.readCommandLineOpts();
+    //parameter TOPOLOGY_MESSAGE_TIMEOUT_SECS is declared @isInteger and 
@NotNull in storm-core (org.apache.storm.Config)
+    baseMessageTimeoutSecs =
+            (Integer) 
stormConf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0);
+    cliMessageTimeoutSecs =
+            (Integer)   
cliConf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0);
+    //parameter TOPOLOGY_TICK_TUPLE_FREQ_SECS is only declared @isInteger, and 
may in fact return null
+    Object scratch;
+    scratch = stormConf.getOrDefault(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 0);
+    baseTickTupleFreqSecs = (scratch == null) ? 0 : (Integer) scratch;
+    scratch =   cliConf.getOrDefault(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 0);
+    cliTickTupleFreqSecs  = (scratch == null) ? 0 : (Integer) scratch;
+  }
+
+  private void calcMaxBatchTimeoutAllowed() {
+    // The max batchTimeout allowed also becomes the default batchTimeout.
+    effectiveMessageTimeoutSecs = (cliMessageTimeoutSecs == 0 ? 
baseMessageTimeoutSecs : cliMessageTimeoutSecs);
+    if (effectiveMessageTimeoutSecs == 0) {
+      LOG.info("topology.message.timeout.secs is disabled in both Storm config 
and CLI.  Allowing unlimited batchTimeouts.");
+      maxBatchTimeoutAllowedSecs = Integer.MAX_VALUE;
+    }
+    else {
+      //Recommended value for safe batchTimeout is 1/2 * 
TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+      //We further divide this by batchTimeoutDivisor for the particular 
Writer Bolt we are in,
+      //and subtract a delta of 1 second for surety (as well as rounding down).
+      //So if everything is defaulted, maxBatchTimeoutAllowedSecs is 14.
+      maxBatchTimeoutAllowedSecs = effectiveMessageTimeoutSecs / 2 / 
batchTimeoutDivisor - 1;
+      if (maxBatchTimeoutAllowedSecs <= 0) { //disallowed, and shouldn't 
happen with reasonable configs
+        maxBatchTimeoutAllowedSecs = 1;
+      }
+    }
+  }
+
+  /**
+   * @return the max batchTimeout allowed, in seconds
+   * Guaranteed positive number.
+   */
+  protected int getDefaultBatchTimeout() {
+    if (!initialized) {this.init();}
+    return maxBatchTimeoutAllowedSecs;
+  }
+
+  private void readMinBatchTimeoutRequested() {
+    // The knowledge of how to list the currently configured batchTimeouts
+    // is delegated to the WriterConfiguration for the bolt that called us.
+    List<Integer> configuredTimeouts = listAllConfiguredTimeouts.get();
+
+    // Discard non-positive values, which mean "use default"
+    int minval = Integer.MAX_VALUE;
+    for (int k : configuredTimeouts) {
+      if (k < minval && k > 0) minval = k;
+    }
+    minBatchTimeoutRequestedSecs = minval;
+  }
+
+  private void calcRecommendedTickInterval() {
+    recommendedTickIntervalSecs = Integer.min(minBatchTimeoutRequestedSecs, 
maxBatchTimeoutAllowedSecs);
+    //If needed, we can +=1 to assure triggering on each cycle, but this 
shouldn't be necessary.
+    //Note that this strategy means that sensors with batchTimeout requested 
less frequently
+    //may now have latency of "their requested batchTimeout" + "this 
recommended tick interval",
+    //in the worst case.
+  }
+
+  /**
+   * @return the recommended TickInterval to request, in seconds
+   * Guaranteed positive number.
+   */
+  protected int getRecommendedTickInterval() {
+    if (!initialized) {this.init();}
+    // Remember that parameter settings in the CLI override parameter settings 
set by the Storm component.
+    // We shouldn't have to deal with this in the Metron environment, but just 
in case,
+    // warn if our recommended value will be overridden by 
cliTickTupleFreqSecs.
+    if (cliTickTupleFreqSecs > 0 && cliTickTupleFreqSecs > 
recommendedTickIntervalSecs) {
+      LOG.warn("Parameter '" + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + "' has 
been forced to value '" +
+              Integer.toString(cliTickTupleFreqSecs) + "' via CLI 
configuration.  This will override the desired " +
+              "setting of '" + Integer.toString(recommendedTickIntervalSecs) +
+              "' and may lead to delayed batch flushing.");
+    }
+    if (cliTickTupleFreqSecs > 0 && cliTickTupleFreqSecs < 
recommendedTickIntervalSecs) {
+      LOG.info("Parameter '" + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + "' has 
been forced to value '" +
+              Integer.toString(cliTickTupleFreqSecs) + "' via CLI 
configuration.  This will override the desired " +
+              "setting of '" + Integer.toString(recommendedTickIntervalSecs) +
+              "' and may lead to unexpected periodicity in batch flushing.");
+    }
+    return recommendedTickIntervalSecs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 6e0c371..9b4a6df 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -23,11 +23,13 @@ import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -37,20 +39,27 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 
+import static org.apache.storm.utils.TupleUtils.isTick;
+
 public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(BulkMessageWriterBolt.class);
   private BulkMessageWriter<JSONObject> bulkMessageWriter;
-  private BulkWriterComponent<JSONObject> writerComponent;
+  private BulkWriterComponent<JSONObject> writerComponent = null;
   private String messageGetStrategyType = 
MessageGetters.DEFAULT_JSON_FROM_FIELD.name();
   private String messageGetField;
   private transient MessageGetStrategy messageGetStrategy;
   private transient OutputCollector collector;
-  private transient Function<WriterConfiguration, WriterConfiguration> 
configurationTransformation;
+  private transient Function<WriterConfiguration, WriterConfiguration> 
configurationTransformation = null;
+  private int requestedTickFreqSecs;
+  private int defaultBatchTimeout;
+  private int batchTimeoutDivisor = 1;
+
   public BulkMessageWriterBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
@@ -75,6 +84,82 @@ public class BulkMessageWriterBolt extends 
ConfiguredIndexingBolt {
     return this;
   }
 
+  /**
+   * If this BulkMessageWriterBolt is in a topology where it is daisy-chained 
with
+   * other queuing Writers, then the max amount of time it takes for a tuple
+   * to clear the whole topology is the sum of all the batchTimeouts for all 
the
+   * daisy-chained Writers.  In the common case where each Writer is using the 
default
+   * batchTimeout, it is then necessary to divide that batchTimeout by the 
number of
+   * daisy-chained Writers.  There are no examples of daisy-chained batching 
Writers
+   * in the current Metron topologies, but the feature is available as a 
"fluent"-style
+   * mutator if needed.  It would be used in the topology set-up files such as
+   * metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+   * Default value, if not otherwise set, is 1.
+   *
+   * If non-default batchTimeouts are configured for some components, the 
administrator
+   * may want to take this behavior into account.
+   *
+   * @param batchTimeoutDivisor
+   * @return
+   */
+  public BulkMessageWriterBolt withBatchTimeoutDivisor(int 
batchTimeoutDivisor) {
+    if (batchTimeoutDivisor <= 0) {
+      throw new IllegalArgumentException(String.format("batchTimeoutDivisor 
must be positive. Value provided was %s", batchTimeoutDivisor));
+    }
+    this.batchTimeoutDivisor = batchTimeoutDivisor;
+    return this;
+  }
+
+  /**
+   * Used only for unit testing
+   * @param defaultBatchTimeout
+   */
+  protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
+    this.defaultBatchTimeout = defaultBatchTimeout;
+  }
+
+  /**
+   * Used only for unit testing
+   */
+  public int getDefaultBatchTimeout() {
+    return defaultBatchTimeout;
+  }
+
+  /**
+   * This method is called by TopologyBuilder.createTopology() to obtain 
topology and
+   * bolt specific configuration parameters.  We use it primarily to configure 
how often
+   * a tick tuple will be sent to our bolt.
+   * @return
+   */
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    // This is called long before prepare(), so do some of the same stuff as 
prepare() does,
+    // to get the valid WriterConfiguration.  But don't store any 
non-serializable objects,
+    // else Storm will throw a runtime error.
+    Function<WriterConfiguration, WriterConfiguration> configurationXform;
+    if(bulkMessageWriter instanceof WriterToBulkWriter) {
+      configurationXform = WriterToBulkWriter.TRANSFORMATION;
+    }
+    else {
+      configurationXform = x -> x;
+    }
+    WriterConfiguration writerconf = configurationXform.apply(
+            new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
+
+    BatchTimeoutHelper timeoutHelper = new 
BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
+    this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
+    //And while we've got BatchTimeoutHelper handy, capture the 
defaultBatchTimeout for writerComponent.
+    this.defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+
+    Map<String, Object> conf = super.getComponentConfiguration();
+    if (conf == null) {
+      conf = new HashMap<String, Object>();
+    }
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, requestedTickFreqSecs);
+    LOG.info("Requesting " + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + " set to " 
+ Integer.toString(requestedTickFreqSecs));
+    return conf;
+  }
+
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
     this.writerComponent = new BulkWriterComponent<>(collector);
@@ -92,24 +177,58 @@ public class BulkMessageWriterBolt extends 
ConfiguredIndexingBolt {
       configurationTransformation = x -> x;
     }
     try {
-      bulkMessageWriter.init(stormConf,
-                             context,
-                             configurationTransformation.apply(new 
IndexingWriterConfiguration(bulkMessageWriter.getName(),
-                             getConfigurations()))
-                            );
+      WriterConfiguration writerconf = configurationTransformation.apply(
+              new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
+      if (defaultBatchTimeout == 0) {
+        //This means getComponentConfiguration was never called to initialize 
defaultBatchTimeout,
+        //probably because we are in a unit test scenario.  So calculate it 
here.
+        BatchTimeoutHelper timeoutHelper = new 
BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
+        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+      }
+      writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
+      bulkMessageWriter.init(stormConf, context, writerconf);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  /**
+   * Used only for unit testing.
+   */
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector, Clock clock) {
+    prepare(stormConf, context, collector);
+    writerComponent.withClock(clock);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
-    JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
-    String sensorType = MessageUtils.getSensorType(message);
+    if (isTick(tuple)) {
+      try {
+        if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
+          //WriterToBulkWriter doesn't allow batching, so no need to flush on 
Tick.
+          LOG.debug("Flushing message queues older than their batchTimeouts");
+          writerComponent.flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
+                  new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()))
+                  , messageGetStrategy);
+        }
+      }
+      catch(Exception e) {
+        throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
+      }
+      finally {
+        collector.ack(tuple);
+      }
+      return;
+    }
+
     try
     {
-      WriterConfiguration writerConfiguration = 
configurationTransformation.apply(new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
+      JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
+      String sensorType = MessageUtils.getSensorType(message);
+      LOG.trace("Writing enrichment message: {}", message);
+      WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
+              new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
       if(writerConfiguration.isDefault(sensorType)) {
         //want to warn, but not fail the tuple
         collector.reportError(new Exception("WARNING: Default and (likely) 
unoptimized writer config used for " + bulkMessageWriter.getName() + " writer 
and sensor " + sensorType));
@@ -121,7 +240,6 @@ public class BulkMessageWriterBolt extends 
ConfiguredIndexingBolt {
                            , writerConfiguration
                            , messageGetStrategy
                            );
-      LOG.trace("Writing enrichment message: {}", message);
     }
     catch(Exception e) {
       throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);

http://git-wip-us.apache.org/repos/asf/metron/blob/f072ed23/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
new file mode 100644
index 0000000..6d9b62e
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ *
+ */
+public class BatchTimeoutHelperTest {
+  private final TimeoutListSupplier defaultConfigList = new 
TimeoutListSupplier(Arrays.asList());
+  private final TimeoutListSupplier disabledConfigList = new 
TimeoutListSupplier(Arrays.asList(0,0));
+  private final TimeoutListSupplier smallTimeoutsList = new 
TimeoutListSupplier(Arrays.asList(5, 2, 4, 6));
+  private final TimeoutListSupplier largeTimeoutsList = new 
TimeoutListSupplier(Arrays.asList(100, 200, 150, 500));
+  private final TimeoutListSupplier illegalTimeoutsList = new 
TimeoutListSupplier(Arrays.asList(5, 2, -3, 6));
+
+  @Test
+  public void testGetDefaultBatchTimeout() throws Exception {
+    //The defaultBatchTimeout is dependent only on batchTimeoutDivisor and the 
Storm config
+    //and CLI overrides, which aren't of interest here.
+    assertEquals(30, 
Utils.readStormConfig().getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0));
+    BatchTimeoutHelper bth;
+    bth = new BatchTimeoutHelper(defaultConfigList, 1);
+    assertEquals(14, bth.getDefaultBatchTimeout());
+    bth = new BatchTimeoutHelper(defaultConfigList, 2);
+    assertEquals(6, bth.getDefaultBatchTimeout());
+    bth = new BatchTimeoutHelper(defaultConfigList, 3);
+    assertEquals(4, bth.getDefaultBatchTimeout());
+    bth = new BatchTimeoutHelper(defaultConfigList, 4);
+    assertEquals(2, bth.getDefaultBatchTimeout());
+    bth = new BatchTimeoutHelper(defaultConfigList, 6);
+    assertEquals(1, bth.getDefaultBatchTimeout());
+    bth = new BatchTimeoutHelper(defaultConfigList, 20);
+    assertEquals(1, bth.getDefaultBatchTimeout());
+
+    bth = new BatchTimeoutHelper(disabledConfigList, 2);
+    assertEquals(6, bth.getDefaultBatchTimeout());
+    bth = new BatchTimeoutHelper(smallTimeoutsList, 2);
+    assertEquals(6, bth.getDefaultBatchTimeout());
+  }
+
+  @Test
+  public void testGetRecommendedTickInterval() throws Exception {
+    //The recommendedTickInterval is the min of defaultBatchTimeout and the 
configured TimeoutsList.
+    BatchTimeoutHelper bth;
+    bth = new BatchTimeoutHelper(defaultConfigList, 2);
+    assertEquals(6, bth.getRecommendedTickInterval());
+    bth = new BatchTimeoutHelper(disabledConfigList, 2);
+    assertEquals(6, bth.getRecommendedTickInterval());
+    bth = new BatchTimeoutHelper(largeTimeoutsList, 2);
+    assertEquals(6, bth.getRecommendedTickInterval());
+    bth = new BatchTimeoutHelper(smallTimeoutsList, 2);
+    assertEquals(2, bth.getRecommendedTickInterval());
+    bth = new BatchTimeoutHelper(illegalTimeoutsList, 2);
+    assertEquals(2, bth.getRecommendedTickInterval());
+  }
+
+  public static class TimeoutListSupplier implements Supplier<List<Integer>> {
+    private List<Integer> list;
+    public TimeoutListSupplier(List<Integer> list) {
+      this.list = list;
+    }
+    @Override
+    public List<Integer> get() {
+      return list;
+    }
+  }
+
+}

Reply via email to