Hi Beam folks!

I'm running a simple Java Beam pipeline <https://pastebin.com/DY9X9ZVG> on
DirectRunner. The pipeline reads in messages from a Pub/Sub topic and
aggregates them into windows: by processing time and by event time. The
custom timestamp option isn't used, so the event time should be the
message's publish time.

What I'm observing is that the watermark-based trigger doesn't fire for 10+
minutes after the message is received by the pipeline:
https://pastebin.com/zgfDy5ej. I realize that PubsubIO.java's support for
watermarks is somewhat limited and relies on heuristics. However, looking
at how PubsubIO.Read computes watermarks
<https://github.com/apache/beam/blob/784d18b7ac89f87dd7fbf2861ee877f5b6070276/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L963>,
it looks like it will advance the watermark to now() if there are no
incoming messages for over a minute. So it doesn't look like the watermark
should get stuck. In fact, the comment here
<https://github.com/apache/beam/blob/784d18b7ac89f87dd7fbf2861ee877f5b6070276/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L94>
says that the watermark might get *ahead* of the true watermark, which
seems to be consistent with the code.

Is the behavior I'm observing actually expected currently? And if so, how
come?

Thanks in advance,
Alex

Reply via email to