One suspicion I have is that the watermark could be lacking behind a bit. Have you looked at that?
> On 7. May 2017, at 22:44, Josh <[email protected]> wrote: > > Thanks for the replies. > @Ankur I tried putting a GroupByKey between the Window.into and the sink, and > it didn't seem to make any difference... > @Aljoscha I see, that makes sense - so the windowed write code (which uses > TextIO.write().withWindowedWrites()) is not closing the files as soon as the > window has ended? > > I was trying this out with windowed writes, but what I really want to do > doesn't involve windowed writes. I am actually trying to do this: > > pipeline > .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) > .apply(ParDo.of(new MapToKV()) > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) > .apply(Combine.perKey(new MyCombineFn())) > .apply(ParDo.of(new SendWindowToAPI())); > > So here Combine.perKey will do a GroupByKey and then MyCombineFn will > aggregate the values for each key. I then want to use another DoFn > SendWindowToAPI which will ping the aggregate result for each window to a > REST API. I am trying to hack it this way for now since there is no RestIO > sink yet. > > I'm having the same problem doing this as when running my write windowed > files example - the SendWindowToAPI DoFn seems to only ping the API after a > few minutes / 30+ messages have been sent, rather than immediately after each > window. > > Any ideas what's going on here? > > Thanks, > Josh > > > > > On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected] > <mailto:[email protected]>> wrote: > Hi, > First, a bit of clarification (or refinement): a windowing strategy is used > in all subsequent GroupByKey operations until another windowing strategy is > specified. That being said, from quickly glancing at the windowed write-code > I have the suspicion that triggers are not used for windowed writing and that > instead some other scheme is used for determining when to close a file. > > I’m sure others with more knowledge of those parts will jump in later but I > nevertheless wanted to give you this quick answer. > > Best, > Aljoscha > >> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected] >> <mailto:[email protected]>> wrote: >> >> That I believe is expected behavior. The windowing strategy is applied at >> the following group by key operation. To have the windows fire the way you >> want, try putting a group by key immediately after the desired windowing >> function. >> >> The messages right now are being bundled aggressively for performance >> reasons and doing a gbk would ensure desired bundle delineations. >> >> Ankur Chauhan >> Sent from my iPhone >> >> On May 6, 2017, at 14:11, Josh <[email protected] <mailto:[email protected]>> >> wrote: >> >>> Hi all, >>> I am using a PubSubIO source, windowing every 10 seconds and then doing >>> something with the windows, for example: >>> pipeline >>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..)) >>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) >>> .apply(MapElements >>> .into(TypeDescriptors.strings()) >>> .via((PubsubMessage msg) -> msg.getAttribute(..))) >>> .apply(new WriteOneFilePerWindow(..)); >>> My expectation was that if I publish a pubsub message, and then publish >>> another 10+ seconds later, a single file should be written for the previous >>> 10 second window. However I find that I need to publish a lot of messages >>> for any files to be written at all (e.g. 30+ messages). >>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it >>> to fire the windows more eagerly? >>> Thanks, >>> Josh > >
