http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
new file mode 100644
index 0000000..2269c86
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/StandAloneProfilerTest.java
@@ -0,0 +1,255 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the StandAloneProfiler class.
+ */
+public class StandAloneProfilerTest {
+
+  /**
+   * {
+   *   "profiles": [
+   *   ]
+   * }
+   */
+  @Multiline
+  private String noProfiles;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "'global'",
+   *        "init": { "count": 0 },
+   *        "update": { "count": "count + 1" },
+   *        "result": "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String oneProfile;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "'global1'",
+   *        "result": "'result'"
+   *      },
+   *      {
+   *        "profile": "profile2",
+   *        "foreach": "'global2'",
+   *        "result": "'result'"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String twoProfiles;
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "ip_dst_addr": "10.0.0.20",
+   *   "protocol": "HTTP",
+   *   "timestamp": 2222222222222,
+   * }
+   */
+  @Multiline
+  private String messageJson;
+
+  private JSONObject message;
+
+  private long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
+
+  private Context context = Context.EMPTY_CONTEXT();
+
+  @Before
+  public void setup() throws Exception {
+
+    // parse the input message
+    JSONParser parser = new JSONParser();
+    message = (JSONObject) parser.parse(messageJson);
+  }
+
+  @Test
+  public void testWithOneProfile() throws Exception {
+
+    StandAloneProfiler profiler = createProfiler(oneProfile);
+    profiler.apply(message);
+    profiler.apply(message);
+    profiler.apply(message);
+
+    List<ProfileMeasurement> measurements = profiler.flush();
+    assertEquals(1, measurements.size());
+
+    // expect 1 measurement for the 1 profile that has been defined
+    ProfileMeasurement m = measurements.get(0);
+    assertEquals("profile1", m.getProfileName());
+    assertEquals(3, m.getProfileValue());
+  }
+
+
+  @Test
+  public void testWithTwoProfiles() throws Exception {
+
+    StandAloneProfiler profiler = createProfiler(twoProfiles);
+    profiler.apply(message);
+    profiler.apply(message);
+    profiler.apply(message);
+
+    List<ProfileMeasurement> measurements = profiler.flush();
+    assertEquals(2, measurements.size());
+
+    // expect 2 measurements, 1 for each profile
+    List<String> expected = Arrays.asList(new String[] { "profile1", 
"profile2" });
+    {
+      ProfileMeasurement m = measurements.get(0);
+      assertTrue(expected.contains(m.getProfileName()));
+      assertEquals("result", m.getProfileValue());
+    }
+    {
+      ProfileMeasurement m = measurements.get(1);
+      assertTrue(expected.contains(m.getProfileName()));
+      assertEquals("result", m.getProfileValue());
+    }
+  }
+
+  /**
+   * The message count and route count will always be equal, if there is only 
one
+   * profile defined.  The message count and route count can be different when 
there
+   * are multiple profiles defined that each use the same message.
+   */
+  @Test
+  public void testRouteAndMessageCounters() throws Exception {
+    {
+      StandAloneProfiler profiler = createProfiler(noProfiles);
+
+      profiler.apply(message);
+      assertEquals(1, profiler.getMessageCount());
+      assertEquals(0, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(2, profiler.getMessageCount());
+      assertEquals(0, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(3, profiler.getMessageCount());
+      assertEquals(0, profiler.getRouteCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(oneProfile);
+
+      profiler.apply(message);
+      assertEquals(1, profiler.getMessageCount());
+      assertEquals(1, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(2, profiler.getMessageCount());
+      assertEquals(2, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(3, profiler.getMessageCount());
+      assertEquals(3, profiler.getRouteCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(twoProfiles);
+
+      profiler.apply(message);
+      assertEquals(1, profiler.getMessageCount());
+      assertEquals(2, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(2, profiler.getMessageCount());
+      assertEquals(4, profiler.getRouteCount());
+
+      profiler.apply(message);
+      assertEquals(3, profiler.getMessageCount());
+      assertEquals(6, profiler.getRouteCount());
+    }
+  }
+
+  @Test
+  public void testProfileCount() throws Exception {
+    {
+      StandAloneProfiler profiler = createProfiler(noProfiles);
+      assertEquals(0, profiler.getProfileCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(oneProfile);
+      assertEquals(1, profiler.getProfileCount());
+    }
+    {
+      StandAloneProfiler profiler = createProfiler(twoProfiles);
+      assertEquals(2, profiler.getProfileCount());
+    }
+  }
+
+  /**
+   * Creates a ProfilerConfig based on a string containing JSON.
+   *
+   * @param configAsJSON The config as JSON.
+   * @return The ProfilerConfig.
+   * @throws Exception
+   */
+  private ProfilerConfig toProfilerConfig(String configAsJSON) throws 
Exception {
+
+    InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8"));
+    return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
+  }
+
+  /**
+   * Creates the StandAloneProfiler
+   *
+   * @param profileJson The Profiler configuration to use as a String 
containing JSON.
+   * @throws Exception
+   */
+  private StandAloneProfiler createProfiler(String profileJson) throws 
Exception {
+
+    // the TTL and max routes need not be bounded
+    long profileTimeToLiveMillis = Long.MAX_VALUE;
+    long maxNumberOfRoutes = Long.MAX_VALUE;
+
+    ProfilerConfig config = toProfilerConfig(profileJson);
+    return new StandAloneProfiler(config, periodDurationMillis, 
profileTimeToLiveMillis, maxNumberOfRoutes, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
new file mode 100644
index 0000000..c99b401
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/DefaultClockFactoryTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+
+/**
+ * Tests the DefaultClockFactory.
+ */
+public class DefaultClockFactoryTest {
+
+  /**
+   * The object under test.
+   */
+  private DefaultClockFactory clockFactory;
+
+  @Before
+  public void setup() {
+    clockFactory = new DefaultClockFactory();
+  }
+
+  /**
+   * When a 'timestampField' is defined the factory should return a clock
+   * that deals with event time.
+   */
+  @Test
+  public void testCreateEventTimeClock() {
+
+    // configure the profiler to use event time
+    ProfilerConfig config = new ProfilerConfig();
+    config.setTimestampField(Optional.of("timestamp"));
+
+    // the factory should return a clock that handles 'event time'
+    Clock clock = clockFactory.createClock(config);
+    assertTrue(clock instanceof EventTimeClock);
+  }
+
+  /**
+   * When a 'timestampField' is defined the factory should return a clock
+   * that deals with processing time.
+   */
+  @Test
+  public void testCreateProcessingTimeClock() {
+
+    // the profiler uses processing time by default
+    ProfilerConfig config = new ProfilerConfig();
+
+    // the factory should return a clock that handles 'processing time'
+    Clock clock = clockFactory.createClock(config);
+    assertTrue(clock instanceof WallClock);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
new file mode 100644
index 0000000..0397250
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeClockTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class EventTimeClockTest {
+
+  private final String timestampField = "timestamp";
+
+  public JSONObject createMessage() {
+    return new JSONObject();
+  }
+
+  /**
+   * The event time should be extracted from a field contained within a 
message.
+   */
+  @Test
+  public void testEventTime() {
+
+    JSONObject message = createMessage();
+
+    // add a field containing a timestamp to the message
+    final Long timestamp = System.currentTimeMillis();
+    message.put(timestampField, timestamp);
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    assertTrue(result.isPresent());
+    assertEquals(timestamp, result.get());
+  }
+
+  /**
+   * If the timestamp field is a String, it should be converted to Long and 
used as-is.
+   */
+  @Test
+  public void testEventTimeWithString() {
+    JSONObject message = createMessage();
+
+    // the timestamp field is a string
+    final Long timestamp = System.currentTimeMillis();
+    message.put(timestampField, timestamp.toString());
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    assertTrue(result.isPresent());
+    assertEquals(timestamp, result.get());
+  }
+
+  /**
+   * If the message does not contain the timestamp field, then nothing should 
be returned.
+   */
+  @Test
+  public void testMissingTimestampField() {
+
+    // no timestamp added to the message
+    JSONObject message = createMessage();
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    assertFalse(result.isPresent());
+  }
+
+  /**
+   * No timestamp should be returned if the value stored in the timestamp field
+   * cannot be coerced into a valid timestamp.
+   */
+  @Test
+  public void testInvalidValue() {
+
+    // create a message with an invalid value stored in the timestamp field
+    JSONObject message = createMessage();
+    message.put(timestampField, "invalid-timestamp-value");
+
+    // what time is it?
+    EventTimeClock clock = new EventTimeClock(timestampField);
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // no value should be returned
+    assertFalse(result.isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
new file mode 100644
index 0000000..76b2d7b
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/WallClockTest.java
@@ -0,0 +1,54 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+
+public class WallClockTest {
+
+  public JSONObject createMessage() {
+    return new JSONObject();
+  }
+
+  /**
+   * The wall clock time ALWAYS comes from the system clock.
+   */
+  @Test
+  public void testCurrentTimeMillis() {
+
+    JSONObject message = createMessage();
+    long before = System.currentTimeMillis();
+
+    // what time is it?
+    WallClock clock = new WallClock();
+    Optional<Long> result = clock.currentTimeMillis(message);
+
+    // validate
+    long after = System.currentTimeMillis();
+    assertTrue(result.isPresent());
+    assertTrue(result.get() >= before);
+    assertTrue(result.get() <= after);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md 
b/metron-analytics/metron-profiler/README.md
index dc5ec07..218ec66 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -328,6 +328,62 @@ Continuing the previous running example, at this point, 
you have seen how your p
 
 ## Anatomy of a Profile
 
+### Profiler
+
+The Profiler configuration contains only two fields; only one of which is 
required.
+
+```
+{
+    "profiles": [
+        { "profile": "one", ... },
+        { "profile": "two", ... }
+    ],
+    "timestampField": "timestamp"
+}
+```
+
+| Name                              |               | Description
+|---                                |---            |---
+| [profiles](#profiles)             | Required      | A list of zero or more 
Profile definitions.
+| [timestampField](#timestampfield) | Optional      | Indicates whether 
processing time or event time should be used. By default, processing time is 
enabled.
+
+
+#### `profiles`
+
+*Required*
+
+A list of zero or more Profile definitions.
+
+#### `timestampField`
+
+*Optional*
+
+Indicates whether processing time or event time is used. By default, 
processing time is enabled.
+
+##### Processing Time
+
+By default, no `timestampField` is defined.  In this case, the Profiler uses 
system time when generating profiles.  This means that the profiles are 
generated based on when the data has been processed by the Profiler.  This is 
also known as 'processing time'.
+
+This is the simplest mode of operation, but has some draw backs.  If the 
Profiler is consuming live data and all is well, the processing and event times 
will likely remain similar and consistent. If processing time diverges from 
event time, then the Profiler will generate skewed profiles. 
+
+There are a few scenarios that might cause skewed profiles when using 
processing time.  For example when a system has undergone a scheduled 
maintenance window and is restarted, a high volume of messages will need to be 
processed by the Profiler. The output of the Profiler might indicate an 
increase in activity during this time, although no change in activity actually 
occurred on the target network. The same situation could occur if an upstream 
system which provides telemetry undergoes an outage.  
+
+[Event Time](#event-time) can be used to mitigate these problems.
+
+##### Event Time
+
+Alternatively, a `timestampField` can be defined.  This must be the name of a 
field contained within the telemetry processed by the Profiler.  The Profiler 
will extract and use the timestamp contained within this field.
+
+* If a message does not contain this field, it will be dropped.
+
+* The field must contain a timestamp in epoch milliseconds expressed as either 
a numeric or string. Otherwise, the message will be dropped.
+
+* The Profiler will use the same field across all telemetry sources and for 
all profiles.
+
+* Be aware of clock skew across telemetry sources.  If your profile is 
processing telemetry from multiple sources where the clock differs 
significantly, the Profiler may assume that some of those messages are late and 
will be ignored.  Adjusting the 
[`profiler.window.duration`](#profilerwindowduration) and 
[`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks. 
+
+### Profiles
+
 A profile definition requires a JSON-formatted set of elements, many of which 
can contain Stellar code.  The specification contains the following elements.  
(For the impatient, skip ahead to the [Examples](#examples).)
 
 | Name                          |               | Description
@@ -466,15 +522,19 @@ The values can be changed on disk and then the Profiler 
topology must be restart
 
 | Setting                                                                      
 | Description
 |---                                                                           
 |---
-| [`profiler.input.topic`](#profilerinputtopic)                                
 | The name of the Kafka topic from which to consume data.
-| [`profiler.output.topic`](#profileroutputtopic)                              
 | The name of the Kafka topic to which profile data is written.  Only used 
with profiles that define the [`triage` result field](#result).
+| [`profiler.input.topic`](#profilerinputtopic)                                
 | The name of the input Kafka topic.
+| [`profiler.output.topic`](#profileroutputtopic)                              
 | The name of the output Kafka topic. 
 | [`profiler.period.duration`](#profilerperiodduration)                        
 | The duration of each profile period.  
-| [`profiler.period.duration.units`](#profilerperioddurationunits)             
 | The units used to specify the 
[`profiler.period.duration`](#profilerperiodduration).  
+| [`profiler.period.duration.units`](#profilerperioddurationunits)             
 | The units used to specify the 
[`profiler.period.duration`](#profilerperiodduration).
+| [`profiler.window.duration`](#profilerwindowduration)                        
 | The duration of each profile window.
+| [`profiler.window.duration.units`](#profilerpwindowdurationunits)            
 | The units used to specify the 
[`profiler.window.duration`](#profilerwindowduration).
+| [`profiler.window.lag`](#profilerwindowlag)                                  
 | The maximum time lag for timestamps.
+| [`profiler.window.lag.units`](#profilerpwindowlagunits)                      
 | The units used to specify the [`profiler.window.lag`](#profilerwindowlag).
 | [`profiler.workers`](#profilerworkers)                                       
 | The number of worker processes for the topology.
 | [`profiler.executors`](#profilerexecutors)                                   
 | The number of executors to spawn per component.
 | [`profiler.ttl`](#profilerttl)                                               
 | If a message has not been applied to a Profile in this period of time, the 
Profile will be forgotten and its resources will be cleaned up.
 | [`profiler.ttl.units`](#profilerttlunits)                                    
 | The units used to specify the `profiler.ttl`.
-| [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor)                   
 | A salt is prepended to the row key to help prevent hotspotting.
+| [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor)                   
 | A salt is prepended to the row key to help prevent hot-spotting.
 | [`profiler.hbase.table`](#profilerhbasetable)                                
 | The name of the HBase table that profiles are written to.
 | [`profiler.hbase.column.family`](#profilerhbasecolumnfamily)                 
 | The column family used to store profiles.
 | [`profiler.hbase.batch`](#profilerhbasebatch)                                
 | The number of puts that are written to HBase in a single batch.
@@ -508,6 +568,36 @@ The units used to specify the `profiler.period.duration`.  
This value should be
 
 *Important*: To read a profile using the Profiler Client, the Profiler 
Client's `profiler.client.period.duration.units` property must match this 
value.  Otherwise, the [Profiler 
Client](metron-analytics/metron-profiler-client) will be unable to read the 
profile data.
 
+### `profiler.window.duration`
+
+*Default*: 30
+
+The duration of each profile window.  Telemetry that arrives within a slice of 
time is processed within a single window.  
+
+Many windows of telemetry will be processed during a single profile period.  
This does not change the output of the Profiler, it only changes how the 
Profiler processes data. The window defines how much data the Profiler 
processes in a single pass.
+
+This value should be defined along with 
[`profiler.window.duration.units`](#profilerwindowdurationunits).
+
+This value must be less than the period duration as defined by 
[`profiler.period.duration`](#profilerperiodduration) and 
[`profiler.period.duration.units`](#profilerperioddurationunits).
+
+### `profiler.window.duration.units`
+
+*Default*: SECONDS
+
+The units used to specify the `profiler.window.duration`.  This value should 
be defined along with [`profiler.window.duration`](#profilerwindowduration).
+
+### `profiler.window.lag`
+
+*Default*: 1
+
+The maximum time lag for timestamps. Timestamps cannot arrive out-of-order by 
more than this amount. This value should be defined along with 
[`profiler.window.lag.units`](#profilerwindowlagunits).
+
+### `profiler.window.lag.units`
+
+*Default*: SECONDS
+
+The units used to specify the `profiler.window.lag`.  This value should be 
defined along with [`profiler.window.lag`](#profilerwindowlag).
+
 ### `profiler.workers`
 
 *Default*: 1

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/config/profiler.properties 
b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 896f8d5..fe3c475 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -22,6 +22,10 @@
 
 topology.worker.childopts=
 topology.auto-credentials=
+profiler.workers=1
+profiler.executors=0
+topology.message.timeout.secs=30
+topology.max.spout.pending=100000
 
 ##### Profiler #####
 
@@ -29,10 +33,16 @@ profiler.input.topic=indexing
 profiler.output.topic=enrichments
 profiler.period.duration=15
 profiler.period.duration.units=MINUTES
-profiler.workers=1
-profiler.executors=0
+profiler.window.duration=30
+profiler.window.duration.units=SECONDS
 profiler.ttl=30
 profiler.ttl.units=MINUTES
+profiler.window.lag=1
+profiler.window.lag.units=MINUTES
+profiler.max.routes.per.bolt=10000
+
+##### HBase #####
+
 profiler.hbase.salt.divisor=1000
 profiler.hbase.table=profiler
 profiler.hbase.column.family=P

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml 
b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 9ec5ba4..83c9fde 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,10 +17,12 @@
 name: "profiler"
 
 config:
-    topology.worker.childopts: ${topology.worker.childopts}
     topology.workers: ${profiler.workers}
     topology.acker.executors: ${profiler.executors}
+    topology.worker.childopts: ${topology.worker.childopts}
     topology.auto-credentials: ${topology.auto-credentials}
+    topology.message.timeout.secs: ${topology.message.timeout.secs}
+    topology.max.spout.pending: ${topology.max.spout.pending}
 
 components:
 
@@ -107,11 +109,23 @@ components:
             -   name: "withProducerConfigs"
                 args: [ref: "kafkaWriterProps"]
 
-    -   id: "kafkaDestinationHandler"
-        className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler"
+    -   id: "kafkaEmitter"
+        className: "org.apache.metron.profiler.bolt.KafkaEmitter"
+
+    -   id: "hbaseEmitter"
+        className: "org.apache.metron.profiler.bolt.HBaseEmitter"
+
+    -   id: "windowDuration"
+        className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
+        constructorArgs:
+            - ${profiler.window.duration}
+            - "${profiler.window.duration.units}"
 
-    -   id: "hbaseDestinationHandler"
-        className: "org.apache.metron.profiler.bolt.HBaseDestinationHandler"
+    -   id: "windowLag"
+        className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
+        constructorArgs:
+            - ${profiler.window.lag}
+            - "${profiler.window.lag.units}"
 
 spouts:
 
@@ -129,17 +143,23 @@ bolts:
 
     -   id: "builderBolt"
         className: "org.apache.metron.profiler.bolt.ProfileBuilderBolt"
-        constructorArgs:
-            - "${kafka.zk}"
         configMethods:
+            - name: "withZookeeperUrl"
+              args: ["${kafka.zk}"]
             - name: "withPeriodDuration"
               args: [${profiler.period.duration}, 
"${profiler.period.duration.units}"]
             - name: "withProfileTimeToLive"
               args: [${profiler.ttl}, "${profiler.ttl.units}"]
-            - name: "withDestinationHandler"
-              args: [ref: "kafkaDestinationHandler"]
-            - name: "withDestinationHandler"
-              args: [ref: "hbaseDestinationHandler"]
+            - name: "withEmitter"
+              args: [ref: "kafkaEmitter"]
+            - name: "withEmitter"
+              args: [ref: "hbaseEmitter"]
+            - name: "withTumblingWindow"
+              args: [ref: "windowDuration"]
+            - name: "withLag"
+              args: [ref: "windowLag"]
+            - name: "withMaxNumberOfRoutes"
+              args: [${profiler.max.routes.per.bolt}]
 
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.bolt.HBaseBolt"

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
deleted file mode 100644
index 2257784..0000000
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-
-/**
- * This class handles the mechanics of emitting a profile measurement to a
- * stream responsible for writing to a specific destination.
- *
- * The measurements produced by a profile can be written to one or more
- * destinations; HBase, Kafka, etc.  Each of the destinations leverage a
- * separate stream within the topology definition.
- */
-public interface DestinationHandler {
-
-  /**
-   * Each destination leverages a unique stream.  This method defines
-   * the unique stream identifier.
-   *
-   * The stream identifier must also be declared within the topology
-   * definition.
-   */
-  String getStreamId();
-
-  /**
-   * Declares the output fields for the stream.
-   * @param declarer
-   */
-  void declareOutputFields(OutputFieldsDeclarer declarer);
-
-  /**
-   * Emit the measurement.
-   * @param measurement The measurement to emit.
-   * @param collector The output collector.
-   */
-  void emit(ProfileMeasurement measurement, OutputCollector collector);
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
new file mode 100644
index 0000000..b9f57dd
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
@@ -0,0 +1,126 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Signals a flush on a fixed frequency; every X milliseconds.
+ */
+public class FixedFrequencyFlushSignal implements FlushSignal {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The latest known timestamp.
+   */
+  private long currentTime;
+
+  /**
+   * The time when the next flush should occur.
+   */
+  private long flushTime;
+
+  /**
+   * The amount of time between flushes in milliseconds.
+   */
+  private long flushFrequency;
+
+  public FixedFrequencyFlushSignal(long flushFrequencyMillis) {
+
+    if(flushFrequencyMillis < 0) {
+      throw new IllegalArgumentException("flush frequency must be >= 0");
+    }
+
+    this.flushFrequency = flushFrequencyMillis;
+    reset();
+  }
+
+  /**
+   * Resets the state used to keep track of time.
+   */
+  @Override
+  public void reset() {
+    flushTime = 0;
+    currentTime = 0;
+
+    LOG.debug("Flush counters reset");
+  }
+
+  /**
+   * Update the internal state which tracks time.
+   *
+   * @param timestamp The timestamp received within a tuple.
+   */
+  @Override
+  public void update(long timestamp) {
+
+    if(timestamp > currentTime) {
+
+      // need to update current time
+      LOG.debug("Updating current time; last={}, new={}", currentTime, 
timestamp);
+      currentTime = timestamp;
+
+    } else if ((currentTime - timestamp) > flushFrequency) {
+
+      // significantly out-of-order timestamps
+      LOG.warn("Timestamps out-of-order by '{}' ms. This may indicate a 
problem in the data. last={}, current={}",
+              (currentTime - timestamp),
+              timestamp,
+              currentTime);
+    }
+
+    if(flushTime == 0) {
+
+      // set the next time to flush
+      flushTime = currentTime + flushFrequency;
+      LOG.debug("Setting flush time; flushTime={}, currentTime={}, 
flushFreq={}",
+              flushTime,
+              currentTime,
+              flushFrequency);
+    }
+  }
+
+  /**
+   * Returns true, if it is time to flush.
+   *
+   * @return True if time to flush.  Otherwise, false.
+   */
+  @Override
+  public boolean isTimeToFlush() {
+
+    boolean flush = currentTime > flushTime;
+    LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
+            flush,
+            flush ? 0 : (flushTime-currentTime),
+            currentTime,
+            flushTime);
+
+    return flush;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
new file mode 100644
index 0000000..0a9fc76
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FlushSignal.java
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+/**
+ * Signals when it is time to flush a profile.
+ */
+public interface FlushSignal {
+
+  /**
+   * Returns true, if it is time to flush.
+   *
+   * @return True if time to flush.  Otherwise, false.
+   */
+  boolean isTimeToFlush();
+
+  /**
+   * Update the signaller with a known timestamp.
+   *
+   * @param timestamp A timestamp expected to be epoch milliseconds
+   */
+  void update(long timestamp);
+
+  /**
+   * Reset the signaller.
+   */
+  void reset();
+
+  /**
+   * Returns the current time in epoch milliseconds.
+   * @return The current time in epoch milliseconds.
+   */
+  long currentTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
deleted file mode 100644
index 4fa5dc1..0000000
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.io.Serializable;
-
-/**
- * Handles emitting a ProfileMeasurement to the stream which writes
- * profile measurements to HBase.
- */
-public class HBaseDestinationHandler implements DestinationHandler, 
Serializable {
-
-  /**
-   * The stream identifier used for this destination;
-   */
-  private String streamId = "hbase";
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream(getStreamId(), new Fields("measurement"));
-  }
-
-  @Override
-  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
-    collector.emit(getStreamId(), new Values(measurement));
-  }
-
-  @Override
-  public String getStreamId() {
-    return streamId;
-  }
-
-  public void setStreamId(String streamId) {
-    this.streamId = streamId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
new file mode 100644
index 0000000..8e1229a
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Responsible for emitting a {@link ProfileMeasurement} to an output stream 
that will
+ * persist data in HBase.
+ */
+public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private  String streamId = "hbase";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream(getStreamId(), new Fields("measurement"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+    collector.emit(getStreamId(), new Values(measurement));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
deleted file mode 100644
index be82468..0000000
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles emitting a ProfileMeasurement to the stream which writes
- * profile measurements to Kafka.
- */
-public class KafkaDestinationHandler implements DestinationHandler, 
Serializable {
-
-  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  /**
-   * The stream identifier used for this destination;
-   */
-  private String streamId = "kafka";
-
-  /**
-   * The 'source.type' of messages originating from the Profiler.
-   */
-  private String sourceType = "profiler";
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    // the kafka writer expects a field named 'message'
-    declarer.declareStream(getStreamId(), new Fields("message"));
-  }
-
-  @Override
-  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
-
-    JSONObject message = new JSONObject();
-    message.put("profile", measurement.getDefinition().getProfile());
-    message.put("entity", measurement.getEntity());
-    message.put("period", measurement.getPeriod().getPeriod());
-    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
-    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
-    message.put("timestamp", System.currentTimeMillis());
-    message.put("source.type", sourceType);
-    message.put("is_alert", "true");
-
-    // append each of the triage values to the message
-    measurement.getTriageValues().forEach((key, value) -> {
-
-      if(isValidType(value)) {
-        message.put(key, value);
-
-      } else {
-        LOG.error(String.format("triage expression has invalid type. expect 
primitive types only. skipping: profile=%s, entity=%s, expression=%s, type=%s",
-                measurement.getDefinition().getProfile(), 
measurement.getEntity(), key, ClassUtils.getShortClassName(value, "null")));
-      }
-    });
-
-    collector.emit(getStreamId(), new Values(message));
-  }
-
-  /**
-   * The result of a profile's triage expressions must be a string or 
primitive type.
-   *
-   * This ensures that the value can be easily serialized and appended to a 
message destined for Kafka.
-   *
-   * @param value The value of a triage expression.
-   * @return True, if the type of the value is valid.
-   */
-  private boolean isValidType(Object value) {
-    return value != null && (value instanceof String || 
ClassUtils.isPrimitiveOrWrapper(value.getClass()));
-  }
-
-  @Override
-  public String getStreamId() {
-    return streamId;
-  }
-
-  public void setStreamId(String streamId) {
-    this.streamId = streamId;
-  }
-
-  public void setSourceType(String sourceType) {
-    this.sourceType = sourceType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
new file mode 100644
index 0000000..29d1a49
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
@@ -0,0 +1,114 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for emitting a {@link ProfileMeasurement} to an output stream 
that will
+ * persist data in HBase.
+ */
+public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private String streamId = "kafka";
+
+  /**
+   * The 'source.type' of messages originating from the Profiler.
+   */
+  private String sourceType = "profiler";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    // the kafka writer expects a field named 'message'
+    declarer.declareStream(getStreamId(), new Fields("message"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+
+    JSONObject message = new JSONObject();
+    message.put("profile", measurement.getDefinition().getProfile());
+    message.put("entity", measurement.getEntity());
+    message.put("period", measurement.getPeriod().getPeriod());
+    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
+    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+    message.put("timestamp", System.currentTimeMillis());
+    message.put("source.type", sourceType);
+    message.put("is_alert", "true");
+
+    // append each of the triage values to the message
+    measurement.getTriageValues().forEach((key, value) -> {
+
+      if(isValidType(value)) {
+        message.put(key, value);
+
+      } else {
+        LOG.error(String.format(
+                "triage expression must result in primitive type, skipping; 
type=%s, profile=%s, entity=%s, expr=%s",
+                ClassUtils.getShortClassName(value, "null"),
+                measurement.getDefinition().getProfile(),
+                measurement.getEntity(),
+                key));
+      }
+    });
+
+    collector.emit(getStreamId(), new Values(message));
+  }
+
+  /**
+   * The result of a profile's triage expressions must be a string or 
primitive type.
+   *
+   * This ensures that the value can be easily serialized and appended to a 
message destined for Kafka.
+   *
+   * @param value The value of a triage expression.
+   * @return True, if the type of the value is valid.
+   */
+  private boolean isValidType(Object value) {
+    return value != null && (value instanceof String || 
ClassUtils.isPrimitiveOrWrapper(value.getClass()));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  public void setSourceType(String sourceType) {
+    this.sourceType = sourceType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
new file mode 100644
index 0000000..d8e9539
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ManualFlushSignal.java
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+/**
+ * Signals that a flush should occur.
+ *
+ * <p>The flush signal can be turned on or off like a switch as needed.  Most 
useful for testing.
+ */
+public class ManualFlushSignal implements FlushSignal {
+
+  private boolean flushNow = false;
+
+  public void setFlushNow(boolean flushNow) {
+    this.flushNow = flushNow;
+  }
+
+  @Override
+  public boolean isTimeToFlush() {
+    return flushNow;
+  }
+
+  @Override
+  public void update(long timestamp) {
+    // nothing to do
+  }
+
+  @Override
+  public void reset() {
+    // nothing to do.
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    // not needed
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 3c8d875..ffe823f 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -20,19 +20,36 @@
 
 package org.apache.metron.profiler.bolt;
 
-import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
 import org.apache.metron.profiler.DefaultMessageDistributor;
+import org.apache.metron.profiler.MessageDistributor;
 import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.windowing.TupleWindow;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
@@ -42,42 +59,76 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static java.lang.String.format;
+import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
+import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.MESSAGE_TUPLE_FIELD;
+import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.PROFILE_TUPLE_FIELD;
+import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD;
 
 /**
- * A bolt that is responsible for building a Profile.
- *
- * This bolt maintains the state required to build a Profile.  When the window
- * period expires, the data is summarized as a ProfileMeasurement, all state is
- * flushed, and the ProfileMeasurement is emitted.
+ * A Storm bolt that is responsible for building a profile.
  *
+ * <p>This bolt maintains the state required to build a Profile.  When the 
window
+ * period expires, the data is summarized as a {@link ProfileMeasurement}, all 
state is
+ * flushed, and the {@link ProfileMeasurement} is emitted.
  */
-public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
+public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable 
{
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private OutputCollector collector;
 
   /**
+   * The URL to connect to Zookeeper.
+   */
+  private String zookeeperUrl;
+
+  /**
+   * The Zookeeper client connection.
+   */
+  protected CuratorFramework zookeeperClient;
+
+  /**
+   * The Zookeeper cache.
+   */
+  protected ZKCache zookeeperCache;
+
+  /**
+   * Manages configuration for the Profiler.
+   */
+  private ProfilerConfigurations configurations;
+
+  /**
    * The duration of each profile period in milliseconds.
    */
   private long periodDurationMillis;
 
   /**
+   * The duration of Storm's event window.
+   */
+  private long windowDurationMillis;
+
+  /**
    * If a message has not been applied to a Profile in this number of 
milliseconds,
    * the Profile will be forgotten and its resources will be cleaned up.
    *
-   * WARNING: The TTL must be at least greater than the period duration.
+   * <p>WARNING: The TTL must be at least greater than the period duration.
    */
   private long profileTimeToLiveMillis;
 
   /**
+   * The maximum number of {@link MessageRoute} routes that will be maintained 
by
+   * this bolt.  After this value is exceeded, lesser used routes will be 
evicted
+   * from the internal cache.
+   */
+  private long maxNumberOfRoutes;
+
+  /**
    * Distributes messages to the profile builders.
    */
-  private DefaultMessageDistributor messageDistributor;
+  private MessageDistributor messageDistributor;
 
   /**
    * Parses JSON messages.
@@ -85,112 +136,245 @@ public class ProfileBuilderBolt extends 
ConfiguredProfilerBolt {
   private transient JSONParser parser;
 
   /**
-   * The measurements produced by a profile can be written to multiple 
destinations.  Each
-   * destination is handled by a separate `DestinationHandler`.
+   * Responsible for emitting {@link ProfileMeasurement} values.
+   *
+   * <p>The {@link ProfileMeasurement} values generated by a profile can be 
written to
+   * multiple endpoints like HBase or Kafka.  Each endpoint is handled by a 
separate
+   * {@link ProfileMeasurementEmitter}.
    */
-  private List<DestinationHandler> destinationHandlers;
+  private List<ProfileMeasurementEmitter> emitters;
 
   /**
-   * @param zookeeperUrl The Zookeeper URL that contains the configuration 
data.
+   * Signals when it is time to flush.
    */
-  public ProfileBuilderBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-    this.destinationHandlers = new ArrayList<>();
-  }
+  private FlushSignal flushSignal;
 
-  /**
-   * Defines the frequency at which the bolt will receive tick tuples.  Tick 
tuples are
-   * used to control how often a profile is flushed.
-   */
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    // how frequently should the bolt receive tick tuples?
-    Config conf = new Config();
-    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
TimeUnit.MILLISECONDS.toSeconds(periodDurationMillis));
-    return conf;
+  public ProfileBuilderBolt() {
+    this.emitters = new ArrayList<>();
   }
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
     super.prepare(stormConf, context, collector);
 
+    if(periodDurationMillis <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.period.duration' >= 
0");
+    }
+    if(profileTimeToLiveMillis <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.ttl' >= 0");
+    }
     if(profileTimeToLiveMillis < periodDurationMillis) {
-      throw new IllegalStateException(format(
-              "invalid configuration: expect profile TTL (%d) to be greater 
than period duration (%d)",
-              profileTimeToLiveMillis,
-              periodDurationMillis));
+      throw new IllegalArgumentException("expect 'profiler.ttl' >= 
'profiler.period.duration'");
     }
+    if(maxNumberOfRoutes <= 0) {
+      throw new IllegalArgumentException("expect 
'profiler.max.routes.per.bolt' > 0");
+    }
+    if(windowDurationMillis <= 0) {
+      throw new IllegalArgumentException("expect 'profiler.window.duration' > 
0");
+    }
+    if(windowDurationMillis > periodDurationMillis) {
+      throw new IllegalArgumentException("expect 'profiler.period.duration' >= 
'profiler.window.duration'");
+    }
+    if(periodDurationMillis % windowDurationMillis != 0) {
+      throw new IllegalArgumentException("expect 'profiler.period.duration' % 
'profiler.window.duration' == 0");
+    }
+
     this.collector = collector;
     this.parser = new JSONParser();
-    this.messageDistributor = new 
DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis);
+    this.messageDistributor = new 
DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, 
maxNumberOfRoutes);
+    this.configurations = new ProfilerConfigurations();
+    this.flushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
+    setupZookeeper();
+  }
+
+  @Override
+  public void cleanup() {
+    zookeeperCache.close();
+    zookeeperClient.close();
+  }
+
+  private void setupZookeeper() {
+    try {
+      if (zookeeperClient == null) {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zookeeperClient = CuratorFrameworkFactory.newClient(zookeeperUrl, 
retryPolicy);
+      }
+      zookeeperClient.start();
+
+      // this is temporary to ensure that any validation passes. the 
individual bolt
+      // will reinitialize stellar to dynamically pull from zookeeper.
+      ConfigurationsUtils.setupStellarStatically(zookeeperClient);
+      if (zookeeperCache == null) {
+        ConfigurationsUpdater<ProfilerConfigurations> updater = 
createUpdater();
+        SimpleEventListener listener = new SimpleEventListener.Builder()
+                .with( updater::update, TreeCacheEvent.Type.NODE_ADDED, 
TreeCacheEvent.Type.NODE_UPDATED)
+                .with( updater::delete, TreeCacheEvent.Type.NODE_REMOVED)
+                .build();
+        zookeeperCache = new ZKCache.Builder()
+                .withClient(zookeeperClient)
+                .withListener(listener)
+                .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+                .build();
+        updater.forceUpdate(zookeeperClient);
+        zookeeperCache.start();
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
+    return new ProfilerUpdater(this, this::getConfigurations);
+  }
+
+  public ProfilerConfigurations getConfigurations() {
+    return configurations;
+  }
+
+  @Override
+  public void reloadCallback(String name, ConfigurationType type) {
+    // nothing to do
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    if(destinationHandlers.size() == 0) {
+
+    if(emitters.size() == 0) {
       throw new IllegalStateException("At least one destination handler must 
be defined.");
     }
 
-    // each destination will define its own stream
-    destinationHandlers.forEach(dest -> dest.declareOutputFields(declarer));
+    // allow each emitter to define its own stream
+    emitters.forEach(emitter -> emitter.declareOutputFields(declarer));
+  }
+
+  /**
+   * Defines the frequency at which the bolt will receive tick tuples.  Tick 
tuples are
+   * used to control how often a profile is flushed.
+   */
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+
+    Map<String, Object> conf = super.getComponentConfiguration();
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
TimeUnit.MILLISECONDS.toSeconds(profileTimeToLiveMillis));
+    return conf;
   }
 
   private Context getStellarContext() {
+
     Map<String, Object> global = getConfigurations().getGlobalConfig();
     return new Context.Builder()
-            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> zookeeperClient)
             .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
             .with(Context.Capabilities.STELLAR_CONFIG, () -> global)
             .build();
   }
 
-  /**
-   * Expect to receive either a tick tuple or a telemetry message that needs 
applied
-   * to a profile.
-   * @param input The tuple.
-   */
   @Override
-  public void execute(Tuple input) {
+  public void execute(TupleWindow window) {
+
+    LOG.debug("Tuple window contains {} tuple(s), {} expired, {} new",
+            CollectionUtils.size(window.get()),
+            CollectionUtils.size(window.getExpired()),
+            CollectionUtils.size(window.getNew()));
+
     try {
-      if(TupleUtils.isTick(input)) {
-        handleTick();
 
-      } else {
-        handleMessage(input);
+      // handle each tuple in the window
+      for(Tuple tuple : window.get()) {
+
+        if(TupleUtils.isTick(tuple)) {
+          handleTick();
+
+        } else {
+          handleMessage(tuple);
+        }
+      }
+
+      // time to flush?
+      if(flushSignal.isTimeToFlush()) {
+        flushSignal.reset();
+
+        // flush the active profiles
+        List<ProfileMeasurement> measurements = messageDistributor.flush();
+        emitMeasurements(measurements);
+
+        LOG.debug("Flushed active profiles and found {} measurement(s).", 
measurements.size());
       }
 
     } catch (Throwable e) {
-      LOG.error(format("Unexpected failure: message='%s', tuple='%s'", 
e.getMessage(), input), e);
-      collector.reportError(e);
 
-    } finally {
-      collector.ack(input);
+      LOG.error("Unexpected error", e);
+      collector.reportError(e);
     }
   }
 
   /**
-   * Handles a telemetry message
-   * @param input The tuple.
+   * Flush all expired profiles when a 'tick' is received.
+   *
+   * If a profile has not received a message for an extended period of time 
then it is
+   * marked as expired.  Periodically we need to flush these expired profiles 
to ensure
+   * that their state is not lost.
    */
-  private void handleMessage(Tuple input) throws ExecutionException {
-    JSONObject message = getField("message", input, JSONObject.class);
-    ProfileConfig definition = getField("profile", input, ProfileConfig.class);
-    String entity = getField("entity", input, String.class);
-    MessageRoute route = new MessageRoute(definition, entity);
+  private void handleTick() {
+
+    // flush the expired profiles
+    List<ProfileMeasurement> measurements = messageDistributor.flushExpired();
+    emitMeasurements(measurements);
 
-    messageDistributor.distribute(message, route, getStellarContext());
+    LOG.debug("Flushed expired profiles and found {} measurement(s).", 
measurements.size());
   }
 
   /**
-   * Handles a tick tuple.
+   * Handles the processing of a single tuple.
+   *
+   * @param input The tuple containing a telemetry message.
    */
-  private void handleTick() {
-    List<ProfileMeasurement> measurements = messageDistributor.flush();
+  private void handleMessage(Tuple input) {
+
+    // crack open the tuple
+    JSONObject message = getField(MESSAGE_TUPLE_FIELD, input, 
JSONObject.class);
+    ProfileConfig definition = getField(PROFILE_TUPLE_FIELD, input, 
ProfileConfig.class);
+    String entity = getField(ENTITY_TUPLE_FIELD, input, String.class);
+    Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class);
+
+    // keep track of time
+    flushSignal.update(timestamp);
+    
+    // distribute the message
+    MessageRoute route = new MessageRoute(definition, entity);
+    messageDistributor.distribute(message, timestamp, route, 
getStellarContext());
 
-    // forward the measurements to each destination handler
-    for(ProfileMeasurement m : measurements ) {
-      destinationHandlers.forEach(handler -> handler.emit(m, collector));
+    LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", 
definition.getProfile(), entity, timestamp);
+  }
+
+  /**
+   * Handles the {@code ProfileMeasurement}s that are created when a profile 
is flushed.
+   *
+   * @param measurements The measurements to handle.
+   */
+  private void emitMeasurements(List<ProfileMeasurement> measurements) {
+
+    // flush each profile
+    for(ProfileMeasurement measurement: measurements) {
+
+      // allow each 'emitter' to emit the measurement
+      for (ProfileMeasurementEmitter emitter : emitters) {
+        emitter.emit(measurement, collector);
+
+        LOG.debug("Measurement emitted; stream={}, profile={}, entity={}, 
value={}, start={}, end={}, duration={}, period={}",
+                emitter.getStreamId(),
+                measurement.getProfileName(),
+                measurement.getEntity(),
+                measurement.getProfileValue(),
+                measurement.getPeriod().getStartTimeMillis(),
+                measurement.getPeriod().getEndTimeMillis(),
+                measurement.getPeriod().getDurationMillis(),
+                measurement.getPeriod().getPeriod());
+      }
     }
+
+    LOG.debug("Emitted {} measurement(s).", measurements.size());
   }
 
   /**
@@ -202,14 +386,27 @@ public class ProfileBuilderBolt extends 
ConfiguredProfilerBolt {
    * @param <T> The type of the field value.
    */
   private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) {
+
     T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz);
     if(value == null) {
-      throw new IllegalStateException(format("invalid tuple received: missing 
or invalid field '%s'", fieldName));
+      throw new IllegalStateException(format("Invalid tuple: missing or 
invalid field '%s'", fieldName));
     }
 
     return value;
   }
 
+  @Override
+  public BaseWindowedBolt withTumblingWindow(BaseWindowedBolt.Duration 
duration) {
+
+    // need to capture the window duration for setting the flush count down
+    this.windowDurationMillis = duration.value;
+    return super.withTumblingWindow(duration);
+  }
+
+  public long getPeriodDurationMillis() {
+    return periodDurationMillis;
+  }
+
   public ProfileBuilderBolt withPeriodDurationMillis(long 
periodDurationMillis) {
     this.periodDurationMillis = periodDurationMillis;
     return this;
@@ -224,16 +421,55 @@ public class ProfileBuilderBolt extends 
ConfiguredProfilerBolt {
     return this;
   }
 
+  public long getWindowDurationMillis() {
+    return windowDurationMillis;
+  }
+
   public ProfileBuilderBolt withProfileTimeToLive(int duration, TimeUnit 
units) {
     return withProfileTimeToLiveMillis(units.toMillis(duration));
   }
 
-  public ProfileBuilderBolt withDestinationHandler(DestinationHandler handler) 
{
-    this.destinationHandlers.add(handler);
+  public ProfileBuilderBolt withEmitter(ProfileMeasurementEmitter emitter) {
+    this.emitters.add(emitter);
     return this;
   }
 
-  public DefaultMessageDistributor getMessageDistributor() {
+  public MessageDistributor getMessageDistributor() {
     return messageDistributor;
   }
+
+  public ProfileBuilderBolt withZookeeperUrl(String zookeeperUrl) {
+    this.zookeeperUrl = zookeeperUrl;
+    return this;
+  }
+
+  public ProfileBuilderBolt withZookeeperClient(CuratorFramework 
zookeeperClient) {
+    this.zookeeperClient = zookeeperClient;
+    return this;
+  }
+
+  public ProfileBuilderBolt withZookeeperCache(ZKCache zookeeperCache) {
+    this.zookeeperCache = zookeeperCache;
+    return this;
+  }
+
+  public ProfileBuilderBolt withProfilerConfigurations(ProfilerConfigurations 
configurations) {
+    this.configurations = configurations;
+    return this;
+  }
+
+  public ProfileBuilderBolt withMaxNumberOfRoutes(long maxNumberOfRoutes) {
+    this.maxNumberOfRoutes = maxNumberOfRoutes;
+    return this;
+  }
+
+  public ProfileBuilderBolt withFlushSignal(FlushSignal flushSignal) {
+    this.flushSignal = flushSignal;
+    return this;
+  }
+
+  public ProfileBuilderBolt withMessageDistributor(MessageDistributor 
messageDistributor) {
+    this.messageDistributor = messageDistributor;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
new file mode 100644
index 0000000..e1fe4e1
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileMeasurementEmitter.java
@@ -0,0 +1,59 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+
+/**
+ * Handles the mechanics of emitting a {@link ProfileMeasurement} to an output
+ * stream.
+ *
+ * <p>The Profiler allows the measurements produced by a profile to be written 
to
+ * multiple endpoints such as HBase and Kafka.  Each of these endpoints will 
have
+ * a unique stream that the measurements are written to.
+ *
+ * <p>Implementors of this interface are responsible for defining and managing 
the
+ * output stream for a specific endpoint.
+ */
+public interface ProfileMeasurementEmitter {
+
+  /**
+   * Each destination leverages a unique stream.  This method defines
+   * the unique stream identifier.
+   *
+   * The stream identifier must also be declared within the topology
+   * definition.
+   */
+  String getStreamId();
+
+  /**
+   * Declares the output fields for the stream.
+   * @param declarer
+   */
+  void declareOutputFields(OutputFieldsDeclarer declarer);
+
+  /**
+   * Emit the measurement.
+   * @param measurement The measurement to emit.
+   * @param collector The output collector.
+   */
+  void emit(ProfileMeasurement measurement, OutputCollector collector);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index a453c66..4e62eee 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -21,10 +21,14 @@
 package org.apache.metron.profiler.bolt;
 
 import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
-import org.apache.metron.profiler.MessageRouter;
-import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.DefaultMessageRouter;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.MessageRouter;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.DefaultClockFactory;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -42,16 +46,45 @@ import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
- * The bolt responsible for filtering incoming messages and directing
- * each to the one or more bolts responsible for building a Profile.  Each
- * message may be needed by 0, 1 or even many Profiles.
+ * The Storm bolt responsible for filtering incoming messages and directing
+ * each to the downstream bolts responsible for building a Profile.
  */
 public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  /**
+   * The name of the tuple field containing the entity.
+   *
+   * This is the result of executing a profile's 'entity' Stellar expression 
within
+   * the context of the telemetry message.
+   */
+  protected static final String ENTITY_TUPLE_FIELD = "entity";
+
+  /**
+   * The name of the tuple field containing the profile definition.
+   */
+  protected static final String PROFILE_TUPLE_FIELD = "profile";
+
+  /**
+   * The name of the tuple field containing the telemetry message.
+   */
+  protected static final String MESSAGE_TUPLE_FIELD = "message";
+
+  /**
+   * The name of the tuple field containing the timestamp of the telemetry 
message.
+   *
+   * <p>If a 'timestampField' has been configured, the timestamp was extracted
+   * from a field within the telemetry message.  This enables event time 
processing.
+   *
+   * <p>If a 'timestampField' has not been configured, then the Profiler uses
+   * processing time and the timestamp originated from the system clock.
+   */
+  protected static final String TIMESTAMP_TUPLE_FIELD = "timestamp";
+
   private OutputCollector collector;
 
   /**
@@ -62,7 +95,12 @@ public class ProfileSplitterBolt extends 
ConfiguredProfilerBolt {
   /**
    * The router responsible for routing incoming messages.
    */
-  private MessageRouter router;
+  private transient MessageRouter router;
+
+  /**
+   * Responsible for creating the {@link Clock}.
+   */
+  private transient ClockFactory clockFactory;
 
   /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration for 
this bolt.
@@ -77,6 +115,7 @@ public class ProfileSplitterBolt extends 
ConfiguredProfilerBolt {
     this.collector = collector;
     this.parser = new JSONParser();
     this.router = new DefaultMessageRouter(getStellarContext());
+    this.clockFactory = new DefaultClockFactory();
   }
 
   private Context getStellarContext() {
@@ -88,13 +127,26 @@ public class ProfileSplitterBolt extends 
ConfiguredProfilerBolt {
             .build();
   }
 
+  /**
+   * This bolt consumes telemetry messages and determines if the message is 
needed
+   * by any of the profiles.  The message is then routed to one or more 
downstream
+   * bolts that are responsible for building each profile
+   *
+   * <p>The outgoing tuples are timestamped so that Storm's window and 
event-time
+   * processing functionality can recognize the time of each message.
+   *
+   * <p>The timestamp that is attached to each outgoing tuple is what decides 
if
+   * the Profiler is operating on processing time or event time.
+   *
+   * @param input The tuple.
+   */
   @Override
   public void execute(Tuple input) {
     try {
       doExecute(input);
 
     } catch (IllegalArgumentException | ParseException | 
UnsupportedEncodingException e) {
-      LOG.error("Unexpected failure: message='{}', tuple='{}'", 
e.getMessage(), input, e);
+      LOG.error("Unexpected error", e);
       collector.reportError(e);
 
     } finally {
@@ -103,41 +155,85 @@ public class ProfileSplitterBolt extends 
ConfiguredProfilerBolt {
   }
 
   private void doExecute(Tuple input) throws ParseException, 
UnsupportedEncodingException {
+
     // retrieve the input message
     byte[] data = input.getBinary(0);
     JSONObject message = (JSONObject) parser.parse(new String(data, "UTF8"));
 
     // ensure there is a valid profiler configuration
     ProfilerConfig config = getProfilerConfig();
-    if(config != null) {
+    if(config != null && config.getProfiles().size() > 0) {
+
+      // what time is it?
+      Clock clock = clockFactory.createClock(config);
+      Optional<Long> timestamp = clock.currentTimeMillis(message);
 
-      // emit a message for each 'route'
-      List<MessageRoute> routes = router.route(message, config, 
getStellarContext());
-      for(MessageRoute route : routes) {
-        collector.emit(input, new Values(route.getEntity(), 
route.getProfileDefinition(), message));
-      }
+      // route the message.  if a message does not contain the timestamp 
field, it cannot be routed.
+      timestamp.ifPresent(ts -> routeMessage(input, message, config, ts));
 
     } else {
-      LOG.warn("No Profiler configuration found.  Nothing to do.");
+      LOG.debug("No Profiler configuration found.  Nothing to do.");
     }
   }
 
   /**
+   * Route a message based on the Profiler configuration.
+   * @param input The input tuple on which to anchor.
+   * @param message The telemetry message.
+   * @param config The Profiler configuration.
+   * @param timestamp The timestamp of the telemetry message.
+   */
+  private void routeMessage(Tuple input, JSONObject message, ProfilerConfig 
config, Long timestamp) {
+
+    // emit a tuple for each 'route'
+    List<MessageRoute> routes = router.route(message, config, 
getStellarContext());
+    for (MessageRoute route : routes) {
+
+      Values values = createValues(message, timestamp, route);
+      collector.emit(input, values);
+    }
+
+    LOG.debug("Found {} route(s) for message with timestamp={}", 
routes.size(), timestamp);
+  }
+
+  /**
    * Each emitted tuple contains the following fields.
    * <p>
    * <ol>
-   * <li> entity - The name of the entity.  The actual result of executing the 
Stellar expression.
-   * <li> profile - The profile definition that the message needs applied to.
-   * <li> message - The message containing JSON-formatted data that needs 
applied to a profile.
+   * <li>message - The message containing JSON-formatted data that needs 
applied to a profile.
+   * <li>timestamp - The timestamp of the message.
+   * <li>entity - The name of the entity.  The actual result of executing the 
Stellar expression.
+   * <li>profile - The profile definition that the message needs applied to.
    * </ol>
    * <p>
    */
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("entity", "profile", "message"));
+
+    // the order here must match 'createValues'
+    Fields fields = new Fields(MESSAGE_TUPLE_FIELD, TIMESTAMP_TUPLE_FIELD, 
ENTITY_TUPLE_FIELD, PROFILE_TUPLE_FIELD);
+    declarer.declare(fields);
+  }
+
+  /**
+   * Creates the {@link Values} attached to the outgoing tuple.
+   *
+   * @param message The telemetry message.
+   * @param timestamp The timestamp of the message.
+   * @param route The route the message must take.
+   * @return
+   */
+  private Values createValues(JSONObject message, Long timestamp, MessageRoute 
route) {
+
+    // the order here must match `declareOutputFields`
+    return new Values(message, timestamp, route.getEntity(), 
route.getProfileDefinition());
   }
 
   protected MessageRouter getMessageRouter() {
     return router;
   }
+
+  public void setClockFactory(ClockFactory clockFactory) {
+    this.clockFactory = clockFactory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
new file mode 100644
index 0000000..9d727a3
--- /dev/null
+++ 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
@@ -0,0 +1,12 @@
+{
+  "profiles": [
+    {
+      "profile": "event-time-test",
+      "foreach": "ip_src_addr",
+      "init":   { "counter": "0" },
+      "update": { "counter": "counter + 1" },
+      "result": "counter"
+    }
+  ],
+  "timestampField": "timestamp"
+}
\ No newline at end of file

Reply via email to