[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=115174=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115174 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 24/Jun/18 12:33 Start Date: 24/Jun/18 12:33 Worklog Time Spent: 10m Work Description: stale[bot] closed pull request #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 45be8b691cd..88bcb7250f2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -230,9 +230,10 @@ public void clear(TriggerContext c) throws Exception { @Override public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read(); +Instant currentTime = getCurrentTime(context); return delayedUntil != null -&& getCurrentTime(context) != null -&& getCurrentTime(context).isAfter(delayedUntil); +&& currentTime != null +&& (currentTime.isEqual(delayedUntil) || currentTime.isAfter(delayedUntil)); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java index 9fbf801693b..a83d2ab996f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java @@ -72,7 +72,7 @@ public void testAfterProcessingTimeFixedWindows() throws Exception { tester.injectElements(2, 3); // Advance past the first timer and fire, finishing the first window -tester.advanceProcessingTime(new Instant(16)); +tester.advanceProcessingTime(new Instant(15)); assertTrue(tester.shouldFire(firstWindow)); assertFalse(tester.shouldFire(secondWindow)); tester.fireIfShouldFire(firstWindow); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 115174) Time Spent: 1h 50m (was: 1h 40m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Aljoscha Krettek >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > >
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=115173=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115173 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 24/Jun/18 12:33 Start Date: 24/Jun/18 12:33 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-399753458 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 115173) Time Spent: 1h 40m (was: 1.5h) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Aljoscha Krettek >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=112615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112615 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 17/Jun/18 11:36 Start Date: 17/Jun/18 11:36 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-397872801 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112615) Time Spent: 1.5h (was: 1h 20m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Aljoscha Krettek >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=92081=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92081 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 18/Apr/18 11:10 Start Date: 18/Apr/18 11:10 Worklog Time Spent: 10m Work Description: aljoscha commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-382350903 Basically the other option that @pbartoszek mentioned above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92081) Time Spent: 1h 20m (was: 1h 10m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=92080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92080 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 18/Apr/18 11:09 Start Date: 18/Apr/18 11:09 Worklog Time Spent: 10m Work Description: aljoscha commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-382350767 @robertwb (and also @pbartoszek) when I said "Flink Side" I meant Flink Runner. We can't easily change the way Flink timers behave but we can adapt in the Flink Runner similarly to how we do it for event-time timers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92080) Time Spent: 1h 10m (was: 1h) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=91005=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91005 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 13/Apr/18 22:55 Start Date: 13/Apr/18 22:55 Worklog Time Spent: 10m Work Description: robertwb commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-381279458 Should we close this PR if it was fixed on the Flink side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 91005) Time Spent: 1h (was: 50m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=81927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81927 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 19/Mar/18 17:13 Start Date: 19/Mar/18 17:13 Worklog Time Spent: 10m Work Description: aljoscha commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-374292182 I would be in favour of fixing this on the Flink side, because there is a mismatch in what Flink thinks when a timer should fire. As highlighted by https://github.com/apache/beam/blob/6e4de883efc5e3cb2278d9f2be5ab94e33cf1bf8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L606 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81927) Time Spent: 50m (was: 40m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=81796=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81796 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 19/Mar/18 09:33 Start Date: 19/Mar/18 09:33 Worklog Time Spent: 10m Work Description: pbartoszek commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-374151191 As suggested by @aljoscha the other way to fix this for Flink runner only would be to add 1 ms when registering a timer in Flink. + 1ms could be done for instance [here]( https://github.com/apache/beam/blob/6e4de883efc5e3cb2278d9f2be5ab94e33cf1bf8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1051) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81796) Time Spent: 40m (was: 0.5h) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=81794=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81794 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 19/Mar/18 09:24 Start Date: 19/Mar/18 09:24 Worklog Time Spent: 10m Work Description: pbartoszek commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-374144179 As suggested by @aljoscha another way to fix the issue from Flink runner only would be to change the logic in toFlinkRuntimeWatermark method [here](https://github.com/apache/beam/blob/6e4de883efc5e3cb2278d9f2be5ab94e33cf1bf8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L606) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81794) Time Spent: 0.5h (was: 20m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=81788=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81788 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 19/Mar/18 09:06 Start Date: 19/Mar/18 09:06 Worklog Time Spent: 10m Work Description: pbartoszek commented on issue #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875#issuecomment-374144179 As suggested by @aljoscha another way to fix the issue from Flink runner only would be to change the logic in toFlinkRuntimeWatermark method [here](https://github.com/apache/beam/blob/6e4de883efc5e3cb2278d9f2be5ab94e33cf1bf8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L606) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81788) Time Spent: 20m (was: 10m) > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5 > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3863) AfterProcessingTime trigger doesn't fire reliably
[ https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=81216=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81216 ] ASF GitHub Bot logged work on BEAM-3863: Author: ASF GitHub Bot Created on: 16/Mar/18 16:34 Start Date: 16/Mar/18 16:34 Worklog Time Spent: 10m Work Description: pbartoszek opened a new pull request #4875: BEAM-3863: AfterProcessingTime trigger firing at delayedUntil time URL: https://github.com/apache/beam/pull/4875 The issue with the current logic is that if `delayedUntil` timestamp is equal to the current time when `shouldFire` method is called than `shouldFire` method returns false and trigger doesn't fire and never fires again unless there is some new element for the the sameor allowed lateness is hit. In reality when I tested with Flink runner on a cluster it's very likely on the powerful enough machines that it's actually the same millisecond timer fires at `delayedUntil` timestamp and calls a method `public void onTimers(Iterable timers) throws Exception` from `ReduceFnRunner` and the time current time is probed via `getCurrentTime(context)` in `AfterDelayFromFirstElementStateMachine.shouldFire()` method. The suggested fix is to check if current timestamp is equal or greater than `delayedUntil` timestamp. I created a test job that replicates the issue: https://github.com/pbartoszek/BEAM-3863_late_trigger Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81216) Time Spent: 10m Remaining Estimate: 0h > AfterProcessingTime trigger doesn't fire reliably > - > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Pawel Bartoszek >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( >