The Start() seems to be working so focus on advance(). Is there any way to prove if I am blocked in advance() for dataflow runner? I have been through code and cannot see anything. But I know that does not mean much.
Ken On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote: > Hi Ken, > > I have seen some deadlock behavior with custom sources earlier > (different runner, but that might not be important). Some lessons learned: > > a) please make sure your advance() or start() methods do not block, > that will cause issues and possibly deadlocks you describe > > b) if you want to limit parallelism, that should be possible in the > split() method - you can return collection containing only (this) if > there is no more readers > > Hope this helps, please feel free to ask more details if needed. > > Best, > > Jan > > On 9/19/19 4:47 PM, Ken Barr wrote: > > I have a custom UnboundedSource IO that reads from a series of messaging > > queues. I have implemented this such that the IO takes a list of queues > > and expands a UnboundedSource/UnboundedReader for each queue. > > > > If I use autoscaling with maxNumWorkers <= # number of queues everything > > works well. For example if I have 4 queues and run in dataflow; the > > Dataflow process starts with 1 worker with 4 Readers each consuming from a > > queue. As CPU usage and backlog grow, Dataflow spawns more workers and > > moves the Readers to the new workers. As CPU usage and backlog shrinks, > > the Readers are moved back to the original worker and unused workers are > > deleted. This is exactly what I was hoping for. > > > > Problems happen if I set maxNumWorkers greater then number of queues. As > > scaleup goes past the number of queues, not only are Readers moved, but for > > some reason new Readers are created. This should not be too bad, new > > Readers would just not receive messages as the original Reader is holding > > exclusive access to ensure in-order delivery. The real problem is that the > > original Readers are holding the queue and their advance() method is not > > being called. The new Readers advance() method is being called, but they > > are not active on the queue, hence the system is now deadlocked. > > > > Questions are: > > Why are new Readers being spawned if maxNumWorkers exceeds original number > > of Readers? Is there a way of preventing this as I would like to maintain > > in-order delivery? > > > > Why is the original Readers advance() method no longer being called? This > > is causing a deadlock. >