One suspicion I have is that the watermark could be lacking behind a bit. Have 
you looked at that?

> On 7. May 2017, at 22:44, Josh <[email protected]> wrote:
> 
> Thanks for the replies.
> @Ankur I tried putting a GroupByKey between the Window.into and the sink, and 
> it didn't seem to make any difference...
> @Aljoscha I see, that makes sense - so the windowed write code (which uses 
> TextIO.write().withWindowedWrites()) is not closing the files as soon as the 
> window has ended?
> 
> I was trying this out with windowed writes, but what I really want to do 
> doesn't involve windowed writes. I am actually trying to do this:
> 
> pipeline
> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
> .apply(ParDo.of(new MapToKV())
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply(Combine.perKey(new MyCombineFn()))
> .apply(ParDo.of(new SendWindowToAPI()));
> 
> So here Combine.perKey will do a GroupByKey and then MyCombineFn will 
> aggregate the values for each key. I then want to use another DoFn 
> SendWindowToAPI which will ping the aggregate result for each window to a 
> REST API. I am trying to hack it this way for now since there is no RestIO 
> sink yet.
> 
> I'm having the same problem doing this as when running my write windowed 
> files example - the SendWindowToAPI DoFn seems to only ping the API after a 
> few minutes / 30+ messages have been sent, rather than immediately after each 
> window.
> 
> Any ideas what's going on here?
> 
> Thanks,
> Josh
> 
> 
> 
> 
> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> First, a bit of clarification (or refinement): a windowing strategy is used 
> in all subsequent GroupByKey operations until another windowing strategy is 
> specified. That being said, from quickly glancing at the windowed write-code 
> I have the suspicion that triggers are not used for windowed writing and that 
> instead some other scheme is used for determining when to close a file.
> 
> I’m sure others with more knowledge of those parts will jump in later but I 
> nevertheless wanted to give you this quick answer.
> 
> Best,
> Aljoscha
> 
>> On 7. May 2017, at 00:57, Ankur Chauhan <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> That I believe is expected behavior. The windowing strategy is applied at 
>> the following group by key operation. To have the windows fire the way you 
>> want, try putting a group by key immediately after the desired windowing 
>> function.
>> 
>> The messages right now are being bundled aggressively for performance 
>> reasons and doing a gbk would ensure desired bundle delineations. 
>> 
>> Ankur Chauhan
>> Sent from my iPhone
>> 
>> On May 6, 2017, at 14:11, Josh <[email protected] <mailto:[email protected]>> 
>> wrote:
>> 
>>> Hi all,
>>> I am using a PubSubIO source, windowing every 10 seconds and then doing 
>>> something with the windows, for example:
>>> pipeline
>>>     .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>>>     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>>>     .apply(MapElements
>>>         .into(TypeDescriptors.strings())
>>>         .via((PubsubMessage msg) -> msg.getAttribute(..)))
>>>     .apply(new WriteOneFilePerWindow(..));
>>> My expectation was that if I publish a pubsub message, and then publish 
>>> another 10+ seconds later, a single file should be written for the previous 
>>> 10 second window. However I find that I need to publish a lot of messages 
>>> for any files to be written at all (e.g. 30+ messages). 
>>> Is this expected behaviour when using PubSubIO? Is there a way to tweak it 
>>> to fire the windows more eagerly?
>>> Thanks,
>>> Josh 
> 
> 

Reply via email to