[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/metron/pull/977


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-06 Thread mmiklavc
Github user mmiklavc commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r179871500
  
--- Diff: 
metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 ---
@@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, 
Context context) throws Exe
   /**
* Builds the key that is used to lookup the {@link ProfileBuilder} 
within the cache.
*
+   * The cache key is built using the hash codes of the profile and 
entity name.  If the profile
+   * definition is ever changed, the same cache entry will not be reused.  
This ensures that no
+   * state can be carried over from the old definition into the new, which 
might result in an
+   * invalid profile measurement.
+   *
* @param profile The profile definition.
* @param entity The entity.
*/
-  private String cacheKey(ProfileConfig profile, String entity) {
-return format("%s:%s", profile, entity);
+  private int cacheKey(ProfileConfig profile, String entity) {
+return new HashCodeBuilder(17, 37)
--- End diff --

Thanks for the explanation @nickwallen, makes sense.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-06 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r179868541
  
--- Diff: 
metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 ---
@@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, 
Context context) throws Exe
   /**
* Builds the key that is used to lookup the {@link ProfileBuilder} 
within the cache.
*
+   * The cache key is built using the hash codes of the profile and 
entity name.  If the profile
+   * definition is ever changed, the same cache entry will not be reused.  
This ensures that no
+   * state can be carried over from the old definition into the new, which 
might result in an
+   * invalid profile measurement.
+   *
* @param profile The profile definition.
* @param entity The entity.
*/
-  private String cacheKey(ProfileConfig profile, String entity) {
-return format("%s:%s", profile, entity);
+  private int cacheKey(ProfileConfig profile, String entity) {
+return new HashCodeBuilder(17, 37)
--- End diff --

That state is maintained in a `ProfileBuilder` stored in this cache.  If 
the profile definition changes, the cache key would change, which would force 
it to start using a different `ProfileBuilder` instance. 

Say I had a v1.0 of the profile that has been running and now I make 
changes, so I'll call that version 2.0 of the profile.  We'd have 
a`ProfileBuilder` that handles v1.0 of the profile definition and another that 
handles v2.0 of the profile.

The v1.0 instance will stop receiving messages because that profile 
definition no longer exists.  The TTL for the profile will lapse and the 
profile will be marked as "expired".  Then periodically this timer thread will 
trigger a flush of all expired profiles.  The state that was in v1.0 will then 
be flushed and stored.

The v2.0 instance will start receiving messages and building its state.  
This instance will remain "active" because it is receiving messages.  This 
active profile will flush when the period expires and its state will be stored.

It is not safe to mix state when a profile definition is changed by a user. 
 You don't know how the profile was changed and whether the change was 
compatible or not. 


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-06 Thread mmiklavc
Github user mmiklavc commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r179860237
  
--- Diff: 
metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 ---
@@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, 
Context context) throws Exe
   /**
* Builds the key that is used to lookup the {@link ProfileBuilder} 
within the cache.
*
+   * The cache key is built using the hash codes of the profile and 
entity name.  If the profile
+   * definition is ever changed, the same cache entry will not be reused.  
This ensures that no
+   * state can be carried over from the old definition into the new, which 
might result in an
+   * invalid profile measurement.
+   *
* @param profile The profile definition.
* @param entity The entity.
*/
-  private String cacheKey(ProfileConfig profile, String entity) {
-return format("%s:%s", profile, entity);
+  private int cacheKey(ProfileConfig profile, String entity) {
+return new HashCodeBuilder(17, 37)
--- End diff --

I'm not as familiar with this functionality - How do we cut over/end an 
existing profile when a profile definition is changed? Is there any continuity 
in the calculations or is it an immediate start over from scratch?


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178600804
  
--- Diff: 
metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
 ---
@@ -70,247 +66,103 @@
   private static final String TEST_RESOURCES = 
"../../metron-analytics/metron-profiler/src/test";
   private static final String FLUX_PATH = 
"../metron-profiler/src/main/flux/profiler/remote.yaml";
 
-  /**
-   * {
-   * "ip_src_addr": "10.0.0.1",
-   * "protocol": "HTTPS",
-   * "length": 10,
-   * "bytes_in": 234
-   * }
-   */
-  @Multiline
-  private static String message1;
-
-  /**
-   * {
-   * "ip_src_addr": "10.0.0.2",
-   * "protocol": "HTTP",
-   * "length": 20,
-   * "bytes_in": 390
-   * }
-   */
-  @Multiline
-  private static String message2;
-
-  /**
-   * {
-   * "ip_src_addr": "10.0.0.3",
-   * "protocol": "DNS",
-   * "length": 30,
-   * "bytes_in": 560
-   * }
-   */
-  @Multiline
-  private static String message3;
-
-  private static ColumnBuilder columnBuilder;
-  private static ZKServerComponent zkComponent;
-  private static FluxTopologyComponent fluxComponent;
-  private static KafkaComponent kafkaComponent;
-  private static ConfigUploadComponent configUploadComponent;
-  private static ComponentRunner runner;
-  private static MockHTable profilerTable;
+  public static final long startAt = 10;
+  public static final String entity = "10.0.0.1";
 
   private static final String tableName = "profiler";
   private static final String columnFamily = "P";
-  private static final double epsilon = 0.001;
   private static final String inputTopic = Constants.INDEXING_TOPIC;
   private static final String outputTopic = "profiles";
   private static final int saltDivisor = 10;
 
-  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5);
+  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
   private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(5);
-  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(15);
-  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(20);
+  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
+  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(15);
   private static final long maxRoutesPerBolt = 10;
 
-  /**
-   * Tests the first example contained within the README.
-   */
-  @Test
-  public void testExample1() throws Exception {
-
-uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
-
-// start the topology and write test messages to kafka
-fluxComponent.submitTopology();
-kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
-// verify - ensure the profile is being persisted
-waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-timeout(seconds(180)));
-
-// verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 
value
-List actuals = read(profilerTable.getPutLog(), columnFamily,
-columnBuilder.getColumnQualifier("value"), Double.class);
-
-// verify - there are 3 'HTTP' each with 390 bytes
-Assert.assertTrue(actuals.stream().anyMatch(val ->
-MathUtils.equals(390.0 * 3, val, epsilon)
-));
-  }
-
-  /**
-   * Tests the second example contained within the README.
-   */
-  @Test
-  public void testExample2() throws Exception {
-
-uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
-
-// start the topology and write test messages to kafka
-fluxComponent.submitTopology();
-kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
-kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
-kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
-// expect 2 values written by the profile; one for 10.0.0.2 and 
another for 10.0.0.3
-final int expected = 2;
-
-// verify - ensure the profile is being persisted
-waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected,
-timeout(seconds(90)));
-
-// verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 
'HTTP' and 10.0.0.3 send 'DNS'
-List actuals = read(profilerTable.getPutLog(), 

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178600343
  
--- Diff: 
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 ---
@@ -310,17 +313,37 @@ public void execute(TupleWindow window) {
   }
 
   /**
-   * Flush all expired profiles when a 'tick' is received.
+   * Flush all active profiles.
+   */
+  protected void flushActive() {
+activeFlushSignal.reset();
+
+// flush the active profiles
+List measurements;
+synchronized(messageDistributor) {
+  measurements = messageDistributor.flush();
+  emitMeasurements(measurements);
+}
+
+LOG.debug("Flushed active profiles and found {} measurement(s).", 
measurements.size());
+
+  }
+
+  /**
+   * Flushes all expired profiles.
*
-   * If a profile has not received a message for an extended period of 
time then it is
+   * If a profile has not received a message for an extended period of 
time then it is
* marked as expired.  Periodically we need to flush these expired 
profiles to ensure
* that their state is not lost.
*/
-  private void handleTick() {
+  protected void flushExpired() {
 
 // flush the expired profiles
-List measurements = 
messageDistributor.flushExpired();
-emitMeasurements(measurements);
+List measurements;
+synchronized (messageDistributor) {
--- End diff --

Access to the `messageDistributor` has to be synchronized now.  It is not 
thread safe and it could be called from either the timer thread or when tuples 
are received.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178600386
  
--- Diff: 
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 ---
@@ -339,11 +362,13 @@ private void handleMessage(Tuple input) {
 Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class);
 
 // keep track of time
-flushSignal.update(timestamp);
+activeFlushSignal.update(timestamp);
 
 // distribute the message
 MessageRoute route = new MessageRoute(definition, entity);
-messageDistributor.distribute(message, timestamp, route, 
getStellarContext());
+synchronized (messageDistributor) {
+  messageDistributor.distribute(message, timestamp, route, 
getStellarContext());
+}
--- End diff --

Access to the `messageDistributor` has to be synchronized now.  It is not 
thread safe and it could be called from either the timer thread or when tuples 
are received.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178600285
  
--- Diff: 
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 ---
@@ -310,17 +313,37 @@ public void execute(TupleWindow window) {
   }
 
   /**
-   * Flush all expired profiles when a 'tick' is received.
+   * Flush all active profiles.
+   */
+  protected void flushActive() {
+activeFlushSignal.reset();
+
+// flush the active profiles
+List measurements;
+synchronized(messageDistributor) {
--- End diff --

Access to the `messageDistributor` has to be synchronized now.  It is not 
thread safe and it could be called from either the timer thread or when tuples 
are received.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178599870
  
--- Diff: 
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 ---
@@ -395,10 +420,46 @@ private void 
emitMeasurements(List measurements) {
 return value;
   }
 
+  /**
+   * Converts milliseconds to seconds and handles an ugly cast.
+   *
+   * @param millis Duration in milliseconds.
+   * @return Duration in seconds.
+   */
+  private int toSeconds(long millis) {
+return (int) TimeUnit.MILLISECONDS.toSeconds(millis);
+  }
+
+  /**
+   * Creates a timer that regularly flushes expired profiles on a separate 
thread.
+   */
+  private void startExpiredFlushTimer() {
+
+expiredFlushTimer = createTimer("flush-expired-profiles-timer");
+expiredFlushTimer.scheduleRecurring(0, 
toSeconds(profileTimeToLiveMillis), () -> flushExpired());
+  }
--- End diff --

This is the timer thread that flushes expired profiles regularly.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178599674
  
--- Diff: 
metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 ---
@@ -281,29 +289,45 @@ public DefaultMessageDistributor 
withPeriodDuration(int duration, TimeUnit units
   /**
* A listener that is notified when profiles expire from the active 
cache.
*/
-  private class ActiveCacheRemovalListener implements 
RemovalListener {
+  private class ActiveCacheRemovalListener implements 
RemovalListener {
 
 @Override
-public void onRemoval(RemovalNotification 
notification) {
+public void onRemoval(RemovalNotification 
notification) {
 
-  String key = notification.getKey();
   ProfileBuilder expired = notification.getValue();
+  LOG.warn("Profile expired from active cache; profile={}, entity={}",
+  expired.getDefinition().getProfile(),
+  expired.getEntity());
 
-  LOG.warn("Profile expired from active cache; key={}", key);
-  expiredCache.put(key, expired);
+  // add the profile to the expired cache
+  expiredCache.put(notification.getKey(), expired);
 }
   }
 
   /**
* A listener that is notified when profiles expire from the active 
cache.
*/
-  private class ExpiredCacheRemovalListener implements 
RemovalListener {
+  private class ExpiredCacheRemovalListener implements 
RemovalListener {
 
 @Override
-public void onRemoval(RemovalNotification 
notification) {
+public void onRemoval(RemovalNotification 
notification) {
+
+  if(notification.wasEvicted()) {
+
+// the expired profile was NOT flushed in time
--- End diff --

A profile being removed from the expired cache is only 'bad' when it is 
evicted.  When an eviction occurs, we get a WARN.  Otherwise, only a DEBUG is 
used.  This makes the logging much more useful when troubleshooting.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/977#discussion_r178599132
  
--- Diff: 
metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 ---
@@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, 
Context context) throws Exe
   /**
* Builds the key that is used to lookup the {@link ProfileBuilder} 
within the cache.
*
+   * The cache key is built using the hash codes of the profile and 
entity name.  If the profile
+   * definition is ever changed, the same cache entry will not be reused.  
This ensures that no
+   * state can be carried over from the old definition into the new, which 
might result in an
+   * invalid profile measurement.
+   *
* @param profile The profile definition.
* @param entity The entity.
*/
-  private String cacheKey(ProfileConfig profile, String entity) {
-return format("%s:%s", profile, entity);
+  private int cacheKey(ProfileConfig profile, String entity) {
+return new HashCodeBuilder(17, 37)
--- End diff --

The cache key needs to ensure that when the user changes a profile 
definition, even slightly, that a different `ProfileBuilder` is used.  Reusing 
the same `ProfileBuilder` would create inconsistent results.

Instead of using `ProfileConfig.toString()` as part of the cache key, it 
now uses the hash code from the profile and the entity.  I think this is less 
error prone and more performant.


---


[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

2018-04-02 Thread nickwallen
GitHub user nickwallen opened a pull request:

https://github.com/apache/metron/pull/977

METRON-1505 Intermittent Profiler Integration Test Failure

### Problem

The integration tests were failing intermittently when Storm unexpectedly 
expired messages generated by the integration tests.  When Storm expired these 
messages they were never received by the Profiler bolts, which caused the 
integration tests to fail.

### Root Cause

Storm's event window mechanism was not configured correctly to use the 
timestamp extracted from the telemetry message.  Storm was instead defaulting 
to system time.  

If the time when the downstream `ProfileBuilderBolt` processed a message 
differed significantly enough from when the upstream `ProfileSplitterBolt` 
processed the message, the message would be errantly expired by Storm.  

This is why the problem could only be replicated when run in Travis, a 
resource constrained environment.  When run on any other environment, the 
system time when these two events occur will not differ enough for Storm to 
mistakenly expire the test messages.

This did not necessarily matter for the core functioning of the Profiler, 
as the Profiler itself continued to use the correct event timestamps.  This bug 
only affected significantly out-of-order messages and the flushing of expired 
profiles for the integration tests.

### The Fix

The simple fix was to ensure that Storm uses the correct event timestamp 
field.  Doing this highlighted another problem.  Storm does not work correctly 
when using tick tuples along with an event timestamp field.  Storm will attempt 
to extract an event timestamp from the tick tuple, which will not exist and 
cause the entire topology to fail.

This meant that I could not use tick tuples.  To work around this, I 
created a separate thread that flushes the expired profiles regularly.  The 
separate thread introduces thread safety concerns, so I also needed to perform 
some locking.

### Changes

Most of these changes were done in separate commits to making review easier.

1. Added a separate thread to the `ProfileBuilderBolt` to flush expired 
profiles regularly.  This is the core fix to the integration test bug.

2. Corrected the key generated to cache `ProfileBuilder` objects.  This 
previously relied on the underlying `ProfileConfig.toString` method which was 
error prone and slow.  It now uses the hash key.

3. Reduced the number of Profiler integration tests.  There is now one 
integration test that tests event time processing and another that tests the 
same profile using processing time.

Previously there were a number of different profiles that were tested.  
This was necessary before as the integration tests were the only effective way 
to test different profile logic.  Since then, significant refactoring has 
occurred which allowed the same logic to be tested in unit tests rather than in 
integration tests.  

This allowed me to clean-up these tests which reduces run time and 
complexity in the integration tests.

4. Added some simple debug logging to `HBaseBolt`.

## Pull Request Checklist

- [ ] Is there a JIRA ticket associated with this PR? If not one needs to 
be created at [Metron 
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
- [ ] Does your PR title start with METRON- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
- [ ] Have you included steps to reproduce the behavior or problem that is 
being changed or addressed?
- [ ] Have you included steps or a guide to how the change may be verified 
and tested manually?
- [ ] Have you ensured that the full suite of tests and checks have been 
executed in the root metron folder
- [ ] Have you written or updated unit tests and or integration tests to 
verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] Have you verified the basic functionality of the build by building 
and running locally with Vagrant full-dev environment or the equivalent?




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nickwallen/metron METRON-1505

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/metron/pull/977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #977


commit