[jira] [Updated] (BEAM-7853) PubsubIO and FixedWindows
[ https://issues.apache.org/jira/browse/BEAM-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Beam JIRA Bot updated BEAM-7853: Labels: stale-P2 (was: ) > PubsubIO and FixedWindows > - > > Key: BEAM-7853 > URL: https://issues.apache.org/jira/browse/BEAM-7853 > Project: Beam > Issue Type: Bug > Components: beam-model >Reporter: Gregory Parsons >Priority: P2 > Labels: stale-P2 > > Hi all, > I am having a potential issue with Windowing on cloud PubsubIO. > I am finding that FixedWindows do not trigger on either DirectRunner or > DataflowRunner after running a GroupBy transform. > A basic pipeline with my use case would look like: > > {code:java} > Pipeline p = Pipeline.create(); > PubsubIO.Read read = PubsubIO > .readMessagesWithAttributes() > .withTimestampAttribute("time") > .fromTopic("test-topic"); > Window window = Window.into( > FixedWindows.of(Duration.standardSeconds(10L)) > ) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(10L)) > .discardingFiredPanes(); > PCollection>> windowedMessages = p > .apply("Read Events", read) > .apply("Apply Window", window) > .apply("Convert to KV", ParDo.of(new ConvertToMapOnKey())) > .apply("Group by key", GroupByKey.create()) > .apply("Log Pairs", ParDo.of(new LogGroupedEvents()));{code} > > LogGroupedEvents would log the key as a string, and the array of > PubsubMessages in the grouped array. But this function never runs correctly. > For simplicity I have simplified the pipeline to demonstrate the issue and > have removed the actual use case of the pipeline. Therefore it may seem odd > that I am grouping and logging simple messages but that is actually not what > I am doing. > > If I swap the windowing function for one with triggers it works correctly. > {code:java} > Window getDefaultWindow(Long duration) { > return Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(duration) > ) > )) > .withAllowedLateness(Duration.standardSeconds(10L)) > .discardingFiredPanes() > ; > } > {code} > > This could be due to me not understanding windowing and triggers but > according the documentation and many examples online all that people use is a > simple FixedWindow because it needs to automatically run a trigger at the end > of the window per the beam docs: > > [https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function] > On example 7.3.1. > > I have been researching as much as I can about how windowing works > internally. We arrived to our solution with triggering by looking at source > code. > > Let me know if there is any other information you need from me to help look > into this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-7853) PubsubIO and FixedWindows
[ https://issues.apache.org/jira/browse/BEAM-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismaël Mejía updated BEAM-7853: --- Status: Open (was: Triage Needed) > PubsubIO and FixedWindows > - > > Key: BEAM-7853 > URL: https://issues.apache.org/jira/browse/BEAM-7853 > Project: Beam > Issue Type: Bug > Components: beam-model >Reporter: Gregory Parsons >Priority: Major > > Hi all, > I am having a potential issue with Windowing on cloud PubsubIO. > I am finding that FixedWindows do not trigger on either DirectRunner or > DataflowRunner after running a GroupBy transform. > A basic pipeline with my use case would look like: > > {code:java} > Pipeline p = Pipeline.create(); > PubsubIO.Read read = PubsubIO > .readMessagesWithAttributes() > .withTimestampAttribute("time") > .fromTopic("test-topic"); > Window window = Window.into( > FixedWindows.of(Duration.standardSeconds(10L)) > ) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(10L)) > .discardingFiredPanes(); > PCollection>> windowedMessages = p > .apply("Read Events", read) > .apply("Apply Window", window) > .apply("Convert to KV", ParDo.of(new ConvertToMapOnKey())) > .apply("Group by key", GroupByKey.create()) > .apply("Log Pairs", ParDo.of(new LogGroupedEvents()));{code} > > LogGroupedEvents would log the key as a string, and the array of > PubsubMessages in the grouped array. But this function never runs correctly. > For simplicity I have simplified the pipeline to demonstrate the issue and > have removed the actual use case of the pipeline. Therefore it may seem odd > that I am grouping and logging simple messages but that is actually not what > I am doing. > > If I swap the windowing function for one with triggers it works correctly. > {code:java} > Window getDefaultWindow(Long duration) { > return Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever( > AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(duration) > ) > )) > .withAllowedLateness(Duration.standardSeconds(10L)) > .discardingFiredPanes() > ; > } > {code} > > This could be due to me not understanding windowing and triggers but > according the documentation and many examples online all that people use is a > simple FixedWindow because it needs to automatically run a trigger at the end > of the window per the beam docs: > > [https://beam.apache.org/documentation/programming-guide/#setting-your-pcollections-windowing-function] > On example 7.3.1. > > I have been researching as much as I can about how windowing works > internally. We arrived to our solution with triggering by looking at source > code. > > Let me know if there is any other information you need from me to help look > into this. -- This message was sent by Atlassian JIRA (v7.6.14#76016)