You can ssh to the dataflow worker and investigate jstack of the
harness. The principal source of blocking would be if you wait for any
data. The logic should be implemented so that if there is no data
available at the moment then just return false and don't wait for
anything. Another suggestion would be, focus on how your reader behaves
when it receives no queue. I think a proper behavior would be to return
false from each call to advance() and set watermark to
BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more
data anymore.
Jan
On 9/19/19 7:51 PM, Ken Barr wrote:
The Start() seems to be working so focus on advance(). Is there any way to
prove if I am blocked in advance() for dataflow runner? I have been through
code and cannot see anything. But I know that does not mean much.
Ken
On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote:
Hi Ken,
I have seen some deadlock behavior with custom sources earlier
(different runner, but that might not be important). Some lessons learned:
a) please make sure your advance() or start() methods do not block,
that will cause issues and possibly deadlocks you describe
b) if you want to limit parallelism, that should be possible in the
split() method - you can return collection containing only (this) if
there is no more readers
Hope this helps, please feel free to ask more details if needed.
Best,
Jan
On 9/19/19 4:47 PM, Ken Barr wrote:
I have a custom UnboundedSource IO that reads from a series of messaging
queues. I have implemented this such that the IO takes a list of queues and
expands a UnboundedSource/UnboundedReader for each queue.
If I use autoscaling with maxNumWorkers <= # number of queues everything works
well. For example if I have 4 queues and run in dataflow; the Dataflow process
starts with 1 worker with 4 Readers each consuming from a queue. As CPU usage
and backlog grow, Dataflow spawns more workers and moves the Readers to the new
workers. As CPU usage and backlog shrinks, the Readers are moved back to the
original worker and unused workers are deleted. This is exactly what I was hoping
for.
Problems happen if I set maxNumWorkers greater then number of queues. As
scaleup goes past the number of queues, not only are Readers moved, but for
some reason new Readers are created. This should not be too bad, new Readers
would just not receive messages as the original Reader is holding exclusive
access to ensure in-order delivery. The real problem is that the original
Readers are holding the queue and their advance() method is not being called.
The new Readers advance() method is being called, but they are not active on
the queue, hence the system is now deadlocked.
Questions are:
Why are new Readers being spawned if maxNumWorkers exceeds original number of
Readers? Is there a way of preventing this as I would like to maintain
in-order delivery?
Why is the original Readers advance() method no longer being called? This is
causing a deadlock.