The Start() seems to be working so focus on advance().  Is there any way to 
prove if I am blocked in advance() for dataflow runner?  I have been through 
code and cannot see anything.  But I know that does not mean much.

Ken
On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote: 
> Hi Ken,
> 
> I have seen some deadlock behavior with custom sources earlier 
> (different runner, but that might not be important). Some lessons learned:
> 
>   a) please make sure your advance() or start() methods do not block, 
> that will cause issues and possibly deadlocks you describe
> 
>   b) if you want to limit parallelism, that should be possible in the 
> split() method - you can return collection containing only (this) if 
> there is no more readers
> 
> Hope this helps, please feel free to ask more details if needed.
> 
> Best,
> 
>   Jan
> 
> On 9/19/19 4:47 PM, Ken Barr wrote:
> > I have a custom UnboundedSource IO that reads from a series of messaging 
> > queues.   I have implemented this such that the IO takes a list of queues 
> > and expands a UnboundedSource/UnboundedReader for each queue.
> >
> > If I use autoscaling with maxNumWorkers <= # number of queues everything 
> > works well.   For example if I have 4 queues and run in dataflow; the 
> > Dataflow process starts with 1 worker with 4 Readers each consuming from a 
> > queue.   As CPU usage  and backlog grow, Dataflow spawns more workers and 
> > moves the Readers to the new workers.  As CPU usage and backlog shrinks,  
> > the Readers are moved back to the original worker and unused workers are 
> > deleted.  This is exactly what I was hoping for.
> >
> > Problems happen if I set maxNumWorkers greater then number of queues.   As 
> > scaleup goes past the number of queues, not only are Readers moved, but for 
> > some reason new Readers are created.  This should not be too bad, new 
> > Readers would just not receive messages as the original Reader is holding 
> > exclusive access to ensure in-order delivery.  The real problem is that the 
> > original Readers are holding the queue and their advance() method is not 
> > being called.  The new Readers advance() method is being called, but they 
> > are not active on the queue, hence the system is now deadlocked.
> >
> > Questions are:
> > Why are new Readers being spawned if maxNumWorkers exceeds original number 
> > of Readers?  Is there a way of preventing this as I would like to maintain 
> > in-order delivery?
> >
> > Why is the original Readers advance() method no longer being called?  This 
> > is causing a deadlock.
> 

Reply via email to