Hi everyone. I've been debugging a streaming job (on dataflow) for a little while now, and seem to have tracked it down to the Wait.on transform that I'm using.
Some background: our pipeline takes in ~250,000 messages/sec from pubsub, aggregates them in an hourly window, and then emits the results. The final output from the combiner is ~20,000,000 keys (emitted at the end of the hour). These results are tee-d, they get written somewhere, and they keys are also sent into a Wait.on transform, where they wait for a signal, before being written to pubsub. If I let this pipeline run for a couple of hours, the input processing rate eventually drops down below the rate of messages going into the queue, and I'll get a bunch of deadline_exceeded errors from windmill for one specific worker. At this point the pipeline is basically unrecoverable and needs to be restarted. If I remove the wait transform, everything works great. My back-of-the-envelope calculations are that the elements going into the wait transform are ~1 GB total, so its not a huge input either. My guess is there's some kind of O(n^2) operation happening here, because this same pipeline does work fairly reliably with a lower key space (~100,000-1 million). The other interesting thing I've noticed is that the stage is constantly processing operations even with no messages coming into it. (eg, in my lower scale case, in the windmill status page, the stage has ~100,000 active operations / second "succeeding", but no messages are going into the stage (since its not the end of the hour)). It's also written 10 GB of data, although the dataflow UI says that only 500 MB of data has gone into the wait transform. It seems like there might just be a bug here, but in the interim, is there any way to construct something similar to the Wait transform but without using side-inputs? My first guess was possibly a CoGroupByKey? Thanks!