Thanks! I made a jira https://issues.apache.org/jira/browse/BEAM-7322
And dumped my sample code here: https://github.com/tims/beam/tree/master/pubsub-watermark *From: *Alexey Romanenko <[email protected]> *Date: *Wed, May 15, 2019 at 12:18 AM *To: * <[email protected]> Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 > > On 13 May 2019, at 20:52, Kenneth Knowles <[email protected]> wrote: > > You should definitely not feel foolish. That was a great report. I expect > many users face the same situation. If they are lurking on this list, then > you will have helped them already. > > Reza - I expect you should weigh in on the Jira, too, since the "one > message test" use case seems like it wouldn't work at all with those > MovingFunction params. But I may not understand all the subtleties of the > connector. > > Kenn > > *From: *Tim Sell <[email protected]> > *Date: *Mon, May 13, 2019 at 8:06 AM > *To: * <[email protected]> > > Thanks for the feedback, I did some more investigating after you said 1 >> second frequency should be enough to sample on.. And it is I feel foolish. >> I think I just wasn't waiting long enough as it takes minutes to close >> the windows. We waited much longer when we were just messages manually and >> never had a window close. >> >> I'm generating some stats of lag times to window closing for different >> frequencies, with code so people can reproduce it, then I'll add this to a >> jira ticket. >> >> *From: *Kenneth Knowles <[email protected]> >> *Date: *Mon, May 13, 2019 at 10:48 AM >> *To: * <[email protected]>, dev >> >> Nice analysis & details! >>> >>> Thanks to your info, I think it is the configuration of MovingFunction >>> [1] that is the likely culprit, but I don't totally understand why. It is >>> configured like so: >>> >>> - store 60 seconds of data >>> - update data every 5 seconds >>> - require at least 10 messages to be 'significant' >>> - require messages from at least 2 distinct 5 second update periods to >>> 'significant' >>> >>> I would expect a rate of 1 message per second to satisfy this. I may >>> have read something wrong. >>> >>> Have you filed an issue in Jira [2]? >>> >>> Kenn >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 >>> [2] https://issues.apache.org/jira/projects/BEAM/issues >>> >>> *From: *Tim Sell <[email protected]> >>> *Date: *Fri, May 10, 2019 at 4:09 AM >>> *To: * <[email protected]> >>> >>> Hello, >>>> >>>> I have identified an issue where the watermark does not advance when >>>> using the beam PubSubIO when volumes are very low. >>>> >>>> The behaviour is easily replicated if you apply a fixed window >>>> triggering after the watermark passes the end of the window. >>>> >>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) >>>> .apply(ParDo.of(new ParseScoreEventFn())) >>>> >>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60))) >>>> .triggering(AfterWatermark.pastEndOfWindow()) >>>> .withAllowedLateness(Duration.standardSeconds(60)) >>>> .discardingFiredPanes()) >>>> .apply(MapElements.into(kvs(strings(), integers())) >>>> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), >>>> scoreEvent.getScore()))) >>>> .apply(Count.perKey()) >>>> .apply(ParDo.of(Log.of("counted per key"))); >>>> >>>> With this triggering, using both the flink local runner the direct >>>> runner, *no panes will ever be emitted* if the volume of messages in >>>> pubsub is very low. eg 1 per second. >>>> >>>> If I change the triggering to have early firings I get exactly the >>>> emitted panes that you would expect. >>>> >>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60))) >>>> .triggering(AfterWatermark.pastEndOfWindow() >>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >>>> .alignedTo(Duration.standardSeconds(60)))) >>>> .withAllowedLateness(Duration.standardSeconds(60)) >>>> .discardingFiredPanes()) >>>> >>>> I can use any variation of early firing triggers and they work as >>>> expected. >>>> >>>> We believe that the watermark is not advancing at all when the volume >>>> is too low because of the sampling that PubSubIO does to determine it's >>>> watermark. It just never has a large enough sample. >>>> This problem occurs in the direct runner and flink runner, but not in >>>> the dataflow runner (because dataflow uses it's own PubSubIO because >>>> dataflow has access to internal details of pubsub and so doesn't need to do >>>> any sampling). >>>> >>>> >>>> I have also verified that for high volumes of messages, the PubSubIO >>>> *does* successfully advance the watermark. Here's a python script I wrote >>>> to mass produce random messages: >>>> >>>> import json >>>> import random >>>> from google.cloud import pubsub_v1 >>>> >>>> >>>> def publish_loop(n, project_id, topic_name): >>>> publisher = pubsub_v1.PublisherClient() >>>> topic_path = publisher.topic_path(project_id, topic_name) >>>> rand = random.Random() >>>> players = ["eufy"] >>>> >>>> for i in range(n): >>>> score = rand.randint(1, 10) >>>> player = rand.choice(players) >>>> message = json.dumps({ >>>> "player": player, >>>> "score": score, >>>> }) >>>> print("'%s'" % message) >>>> publisher.publish(topic_path, data=message.encode("utf-8")) >>>> >>>> Running my code without early firings on Dataflow, I verified it does >>>> count them as you'd expect. >>>> >>>> <Screen Shot 2019-05-08 at 16.41.02.png> >>>> >>>> Doing the same using the direct runner, it struggles to process >>>> messages at rate they are being produced... but it does eventually close >>>> some windows. Here are screenshots of logs with early firings turned on and >>>> then off. >>>> >>>> <Screen Shot 2019-05-08 at 17.02.14.png> >>>> >>>> <Screen Shot 2019-05-08 at 17.21.46.png> >>>> >>>> The key here is that you can see that is logging the ON_TIME panes. >>>> This *never* happened for me if the message rate was as low as 1 per >>>> second. >>>> >>>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted >>>> when there are low volumes from a PubSubIO (when not using Dataflow)? >>>> I believe the details that cause this are within the getWatermark >>>> function in PubsubUnboundedSource, but it looks too delicate for me to >>>> approach. >>>> >>>> It's a problem because we ideally want it to behave well at low volumes >>>> too, but also because this is often one of the first streaming examples >>>> people try. We discovered this while trying to train people on streaming >>>> and it was a bit awkward :) >>>> >>>> Tim Sell >>>> >>>> >>>> >
