[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607087#comment-16607087 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on the issue: https://github.com/apache/metron/pull/1174 Great. I appreciate the review. Very helpful feedback. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607088#comment-16607088 ] ASF GitHub Bot commented on METRON-1748: Github user asfgit closed the pull request at: https://github.com/apache/metron/pull/1174 > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606418#comment-16606418 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on the issue: https://github.com/apache/metron/pull/1174 This looks great @nickwallen. Thanks for the contribution and for taking the time with my feedback. +1 > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606415#comment-16606415 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215785684 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -127,94 +127,53 @@ @Multiline private static String kryoSerializers; - /** - * The Profiler can generate profiles based on processing time. With processing time, - * the Profiler builds profiles based on when the telemetry is processed. - * - * Not defining a 'timestampField' within the Profiler configuration tells the Profiler - * to use processing time. - * - * There are two mechanisms that will cause a profile to flush. - * - * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each - * message (which can be either event or system time.) This advances time and leads to profile - * measurements being flushed. - * - * (2) If no messages arrive to advance time, then the "time-to-live" mechanism will flush a profile - * after a period of time. - * - * This test specifically tests the *first* mechanism where time is advanced by incoming messages. - */ @Test public void testProcessingTime() throws Exception { +uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); -// upload the config to zookeeper -uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - -// start the topology and write test messages to kafka +// start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); - -// the messages that will be applied to the profile kafkaComponent.writeMessages(inputTopic, message1); kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); // retrieve the profile measurement using PROFILE_GET String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; -List actuals = execute(profileGetExpression, List.class); +List measurements = execute(profileGetExpression, List.class); -// storm needs at least one message to close its event window +// need to keep checking for measurements until the profiler has flushed one out int attempt = 0; -while(actuals.size() == 0 && attempt++ < 10) { +while(measurements.size() == 0 && attempt++ < 10) { // wait for the profiler to flush long sleep = windowDurationMillis; LOG.debug("Waiting {} millis for profiler to flush", sleep); Thread.sleep(sleep); - // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // write another message to advance time. this ensures we are testing the 'normal' flush mechanism. // if we do not send additional messages to advance time, then it is the profile TTL mechanism which // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); - // retrieve the profile measurement using PROFILE_GET - actuals = execute(profileGetExpression, List.class); + // try again to retrieve the profile measurement using PROFILE_GET + measurements = execute(profileGetExpression, List.class); } -// the profile should count at least 3 messages -assertTrue(actuals.size() > 0); -assertTrue(actuals.get(0) >= 3); +// expect to see only 1 measurement, but could be more (one for each period) depending on --- End diff -- Ok, that makes sense - that's more what I meant when I said "period" in my original inquiry. It's a measurement that is associated with the k-eth period. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606345#comment-16606345 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215761683 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); --- End diff -- > I looked at the profiler.json in test resources and it looks like it's counting the number of messages that have ip_src_addr. Is that correct? Under what condition(s) would actuals.size() != 1 for the purposes of this test? Hopefully the latest commit clears up your questions around this. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606344#comment-16606344 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215761476 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); --- End diff -- > I could certainly add that. This is just how the tests were originally written. So I looked at this. I'd rather not do this now, only because the ConfigUploader component doesnt just let me pass it a String to upload to Zk. I need to give it a path. But I would definitely like to come back to this and embed the profile definitions in the test like you suggested. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606342#comment-16606342 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215760609 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. + * + * Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + * + * There are two mechanisms that will cause a profile to flush. + * + * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each + * message (which can be either event or system time.) This advances time and leads to profile + * measurements being flushed. + * + * (2) If no messages arrive to advance time, then the "time to live" mechanism will flush a profile + * after a period of time. + * + * This test specifically tests the *second* mechanism when a profile is flushed by the + * "time to live" mechanism. + */ + @Test + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { + +// upload the config to zookeeper --- End diff -- Done > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606341#comment-16606341 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215760574 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. + * + * Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + * + * There are two mechanisms that will cause a profile to flush. + * + * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each + * message (which can be either event or system time.) This advances time and leads to profile + * measurements being flushed. + * + * (2) If no messages arrive to advance time, then the "time to live" mechanism will flush a profile + * after a period of time. + * + * This test specifically tests the *second* mechanism when a profile is flushed by the + * "time to live" mechanism. + */ + @Test + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { --- End diff -- I removed the javadocs from the integration test and put it in the bolt's javadoc. It seemed to fit there nicely. Hopefully this satisfies your valid point around DRY. But let me know if there is something else I can do. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605934#comment-16605934 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215673075 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -186,21 +266,40 @@ public void testEventTime() throws Exception { // start the topology and write test messages to kafka fluxComponent.submitTopology(); -kafkaComponent.writeMessages(inputTopic, message1); -kafkaComponent.writeMessages(inputTopic, message2); -kafkaComponent.writeMessages(inputTopic, message3); - -// wait until the profile is flushed -waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - -List puts = profilerTable.getPutLog(); -assertEquals(1, puts.size()); - -// inspect the row key to ensure the profiler used event time correctly. the timestamp -// embedded in the row key should match those in the source telemetry -byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); -byte[] actualRowKey = puts.get(0).getRow(); -assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); +List messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); +kafkaComponent.writeMessages(inputTopic, messages); + +long timestamp = System.currentTimeMillis(); +LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", timestamp)); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", timestamp)); + +// create the 'window' that looks up to 5 hours before the last timestamp contained in the telemetry +assign("lastTimestamp", "1530978728982L"); --- End diff -- Really what I mean is its the latest/most recent timestamp. The records are not expected to be in order. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605892#comment-16605892 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215660252 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -186,21 +266,40 @@ public void testEventTime() throws Exception { // start the topology and write test messages to kafka fluxComponent.submitTopology(); -kafkaComponent.writeMessages(inputTopic, message1); -kafkaComponent.writeMessages(inputTopic, message2); -kafkaComponent.writeMessages(inputTopic, message3); - -// wait until the profile is flushed -waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - -List puts = profilerTable.getPutLog(); -assertEquals(1, puts.size()); - -// inspect the row key to ensure the profiler used event time correctly. the timestamp -// embedded in the row key should match those in the source telemetry -byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); -byte[] actualRowKey = puts.get(0).getRow(); -assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); +List messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); +kafkaComponent.writeMessages(inputTopic, messages); + +long timestamp = System.currentTimeMillis(); +LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", timestamp)); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", timestamp)); + +// create the 'window' that looks up to 5 hours before the last timestamp contained in the telemetry +assign("lastTimestamp", "1530978728982L"); --- End diff -- On looking at that data file more closely, it looks like the records are not in time order. That TS shows up on line 92 in the telemetry file rather than the last line. Is that expected? I think I'm misunderstanding something. Can you elaborate on that a bit more? > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605763#comment-16605763 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215621040 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. + * + * Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + * + * There are two mechanisms that will cause a profile to flush. + * + * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each + * message (which can be either event or system time.) This advances time and leads to profile + * measurements being flushed. + * + * (2) If no messages arrive to advance time, then the "time to live" mechanism will flush a profile + * after a period of time. + * + * This test specifically tests the *second* mechanism when a profile is flushed by the + * "time to live" mechanism. + */ + @Test + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { --- End diff -- Ok, fair point. Let me see if I can't move this to the README and clean it up. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605761#comment-16605761 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215620641 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); --- End diff -- > Along those lines, as a small recommendation for readability (by no means a firm requirement here), but you might consider using the multiline string annotation that we've leveraged in other areas of code. I could certainly add that. This is just how the tests were originally written. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605714#comment-16605714 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215606430 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -186,21 +266,40 @@ public void testEventTime() throws Exception { // start the topology and write test messages to kafka fluxComponent.submitTopology(); -kafkaComponent.writeMessages(inputTopic, message1); -kafkaComponent.writeMessages(inputTopic, message2); -kafkaComponent.writeMessages(inputTopic, message3); - -// wait until the profile is flushed -waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - -List puts = profilerTable.getPutLog(); -assertEquals(1, puts.size()); - -// inspect the row key to ensure the profiler used event time correctly. the timestamp -// embedded in the row key should match those in the source telemetry -byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); -byte[] actualRowKey = puts.get(0).getRow(); -assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); +List messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); +kafkaComponent.writeMessages(inputTopic, messages); + +long timestamp = System.currentTimeMillis(); +LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", timestamp)); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", timestamp)); + +// create the 'window' that looks up to 5 hours before the last timestamp contained in the telemetry +assign("lastTimestamp", "1530978728982L"); --- End diff -- yes, that is the last timestamp contained in the telemetry. the comment was meant to make that point, but maybe its not clear enough? > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605704#comment-16605704 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215603987 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); --- End diff -- > For my own understanding, is the List that's returned as actuals a List of profile periods? it is a list of profile measurements; retrieve the profile measurement using PROFILE_GET > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605700#comment-16605700 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215603116 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. --- End diff -- The same. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605058#comment-16605058 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215452935 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. + * + * Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + * + * There are two mechanisms that will cause a profile to flush. + * + * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each + * message (which can be either event or system time.) This advances time and leads to profile + * measurements being flushed. + * + * (2) If no messages arrive to advance time, then the "time to live" mechanism will flush a profile + * after a period of time. + * + * This test specifically tests the *second* mechanism when a profile is flushed by the + * "time to live" mechanism. + */ + @Test + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { + +// upload the config to zookeeper --- End diff -- Can we remove the redundant javadoc? Your code speaks for itself here. I'd probably just rename that method precisely as "uploadConfigToZookeeper". Very clear and with 1 less non-executable line. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605057#comment-16605057 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215450093 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. --- End diff -- Is processing time == system time or are these actually 2 different types of timestamp in addition to event/source time? > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605060#comment-16605060 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215456000 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); --- End diff -- For my own understanding, is the List that's returned as `actuals` a List of profile periods? I looked at the profiler.json in test resources and it looks like it's counting the number of messages that have ip_src_addr. Is that correct? Under what condition(s) would actuals.size() != 1 for the purposes of this test? I think that having a clear understanding of the associated profile for this test and why I would expect a value of 3 (as it's not just bc of the flushing mechanism) would be useful for future maintainers. Along those lines, as a small recommendation for readability (by no means a firm requirement here), but you might consider using the multiline string annotation that we've leveraged in other areas of code. It allows the config to sit right along side the test code itself. Just easier to read imo when the size of the multiline string is still manageable/small. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605061#comment-16605061 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215458574 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -186,21 +266,40 @@ public void testEventTime() throws Exception { // start the topology and write test messages to kafka fluxComponent.submitTopology(); -kafkaComponent.writeMessages(inputTopic, message1); -kafkaComponent.writeMessages(inputTopic, message2); -kafkaComponent.writeMessages(inputTopic, message3); - -// wait until the profile is flushed -waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - -List puts = profilerTable.getPutLog(); -assertEquals(1, puts.size()); - -// inspect the row key to ensure the profiler used event time correctly. the timestamp -// embedded in the row key should match those in the source telemetry -byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); -byte[] actualRowKey = puts.get(0).getRow(); -assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); +List messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); +kafkaComponent.writeMessages(inputTopic, messages); + +long timestamp = System.currentTimeMillis(); +LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", timestamp)); +kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", timestamp)); + +// create the 'window' that looks up to 5 hours before the last timestamp contained in the telemetry +assign("lastTimestamp", "1530978728982L"); --- End diff -- What's the significance of this specific value `1530978728982L`? Is that tied to the last record timestamp in the telemetry input data file? > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605059#comment-16605059 ] ASF GitHub Bot commented on METRON-1748: Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215452413 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); +// retrieve the profile measurement using PROFILE_GET +String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; +List actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; -while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { +while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } -// validate what was flushed -List actuals = read( -profilerTable.getPutLog(), -columnFamily, -columnBuilder.getColumnQualifier("value"), -Integer.class); -assertEquals(1, actuals.size()); +// the profile should count at least 3 messages +assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. + * + * Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + * + * There are two mechanisms that will cause a profile to flush. + * + * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each + * message (which can be either event or system time.) This advances time and leads to profile + * measurements being flushed. + * + * (2) If no messages arrive to advance time, then the "time to live" mechanism will flush a profile + * after a period of time. + * + * This test specifically tests the *second* mechanism when a profile is flushed by the + * "time to live" mechanism. + */ + @Test + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { --- End diff -- Might it be better to have the core of this test's javadoc documentation in the main README and simply reference the core terminology from the test? Actually, your test name already has that info (test flushing via TTL), so even that is probably redundant. My concern is that the info is encoded in all of: * test code * test javadoc * source code * source javadoc (maybe it's not, but I'm assuming there's also doc there) * project README's I think expressing the functionality in test/source code is of course good because it's executable. And README's are great as a set of expectations about how the project works. But my concern would be that if this functionality gets updated by someone other than you, then they'll have to be sure it's updated in 5 places. If a test gets updated it might be easy to miss updating associated javadoc of this detail. It's the kind of thing that Bob Martin, Martin Fowler, and Kent Beck are constantly railing about wrt stale docs. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593830#comment-16593830 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on the issue: https://github.com/apache/metron/pull/1174 BTW - This should be ready to go. I fixed the sporadic test failures. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592109#comment-16592109 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on the issue: https://github.com/apache/metron/pull/1174 The CI build hit what seems to be an unrelated, intermittent test failure. ``` ZKConfigurationsCacheIntegrationTest.validateUpdate:230->lambda$validateUpdate$9:230 expected:<{hdfs={index=yaf, batchSize=1, enabled=true}, elasticsearch={index=yaf, batchSize=25, batchTimeout=7, enabled=false}, solr={index=yaf, batchSize=5, enabled=false}}> but was:<{}> ``` Kicking Travis. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592111#comment-16592111 ] ASF GitHub Bot commented on METRON-1748: GitHub user nickwallen reopened a pull request: https://github.com/apache/metron/pull/1174 METRON-1748 Improve Storm Profiler Integration Test Improved the Storm Profiler integration tests based on improvements I made for the Spark Profiler feature branch. * Validate the output of the Profiler using the client library; `PROFILE_GET`. This is better validation that things are working end-to-end. * Using more telemetry messages to validate event time processing. * Added logging around cache maintenence. * Added logging of the tuple windows that are passed from Storm's WindowManager. This can help debug issues where the time lag and window size are incorrectly set. ## Testing This only changes the integration tests and adds logging. Running the integration tests is sufficient to test the changes there. Running up the development environment is sufficient for the extra caution. ## Pull Request Checklist - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). - [x] Does your PR title start with METRON- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Have you included steps to reproduce the behavior or problem that is being changed or addressed? - [x] Have you included steps or a guide to how the change may be verified and tested manually? - [x] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via: - [x] Have you written or updated unit tests and or integration tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent? You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickwallen/metron METRON-1748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/metron/pull/1174.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1174 commit 56b70f666dd75a2861d17a23b7ed307124ba1f74 Author: Nick Allen Date: 2018-08-17T17:13:10Z METRON-1748 Improve Storm Profiler Integration Test commit 33516a52788a764005a7bc3946034bb6a2858201 Author: Nick Allen Date: 2018-08-23T18:54:35Z Small changes to tuple window logging commit fbb2fcd50df7e5812ed63479891f56b65adc899f Author: Nick Allen Date: 2018-08-24T13:01:44Z Need to wait windowLagMillis + periodDurationMillis before sending another message to close the event window commit bf160d521ba8530f556fdc610c6845bf3065e8fb Author: Nick Allen Date: 2018-08-24T13:49:08Z Updated the event time test so that both periods contained in the data are validated > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592110#comment-16592110 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen closed the pull request at: https://github.com/apache/metron/pull/1174 > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590854#comment-16590854 ] ASF GitHub Bot commented on METRON-1748: Github user nickwallen commented on the issue: https://github.com/apache/metron/pull/1174 I am seeing sporadic integration test failures from `ProfilerIntegrationTest.testProcessingTimeWithTimeToLiveFlush` with this change. I need to track that down before this gets merged. > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (METRON-1748) Improve Storm Profiler Integration Test
[ https://issues.apache.org/jira/browse/METRON-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590666#comment-16590666 ] ASF GitHub Bot commented on METRON-1748: GitHub user nickwallen opened a pull request: https://github.com/apache/metron/pull/1174 METRON-1748 Improve Storm Profiler Integration Test Improved the Storm Profiler integration tests based on improvements I made for the Spark Profiler feature branch. * Validate the output of the Profiler using the client library; `PROFILE_GET`. This is better validation that things are working end-to-end. * Using more telemetry messages to validate event time processing. * Added logging around cache maintenence. * Added logging of the tuple windows that are passed from Storm's WindowManager. This can help debug issues where the time lag and window size are incorrectly set. ## Pull Request Checklist - [ ] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). - [ ] Does your PR title start with METRON- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Have you included steps to reproduce the behavior or problem that is being changed or addressed? - [ ] Have you included steps or a guide to how the change may be verified and tested manually? - [ ] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via: - [ ] Have you written or updated unit tests and or integration tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent? You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickwallen/metron METRON-1748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/metron/pull/1174.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1174 commit 56b70f666dd75a2861d17a23b7ed307124ba1f74 Author: Nick Allen Date: 2018-08-17T17:13:10Z METRON-1748 Improve Storm Profiler Integration Test > Improve Storm Profiler Integration Test > --- > > Key: METRON-1748 > URL: https://issues.apache.org/jira/browse/METRON-1748 > Project: Metron > Issue Type: Bug >Reporter: Nick Allen >Assignee: Nick Allen >Priority: Major > > We should use the Profiler Client, like PROFILE_GET, to validate the output > of the Storm Profiler Integration Test. This is better validation that > things are working end-to-end. -- This message was sent by Atlassian JIRA (v7.6.3#76005)