Hi Robert,

Thanks, a coworker and I have narrowed this down to a print(msg) statement
in one of my beam.Map() functions. If we remove the print(), the pipeline
does not stall. The print() itself is necessary but not sufficient: the
problem occurs only when there are many large messages coming into the
pipeline.

It may be that we have a deadlock. This pipeline creates the same
conditions as in Python issue42717 [1]. Especially this is relevant:

```
In your example, the thread is very likely to be busy doing IO, so holding
the io lock.
```

As in the above ticket, the thread dump I posted previously also shows the
same message about "could not acquire lock ... at interpreter shutdown,
possibly due to daemon threads". A simple pure Python program can reproduce
this issue. [2]

So this explains the thread dump at shutdown. We still have the question of
why the pipeline stalled in the first place. My theory is that the worker
threads were already deadlocked. On my console, the worker logs abruptly
stop when this happens, adding evidence to this hypothesis. So Python
merely detected it (with a timed-wait on lock acquisition) and surfaced it
at shutdown when I cancelled the pipeline job much later.

As for what causes the deadlock within Beam pipeline, I'm not sure. If we
imagine the print() statement in beam.Map() to be holding a lock and then
also blocking, and if we have 2 such threads in Beam worker pool contending
for I/O lock, can a deadlock condition occur and stall the pipeline?

Even if we don't get to the full root cause, could we do what the Python
folks did [3]: check for the condition somehow and abort when this happens?
Another option would be to have a single worker pool, so that we don't run
into the problem of multi-threading.

To answer your question, no, the problem happens even when I write to a
Kafka sink. Turns out we only need a print() statement in a Python
beam.Map() call and sufficient I/O throughput to reproduce the problem.

Thanks,
Deepak

[1] https://bugs.python.org/issue42717
[2] https://bugs.python.org/file37835/dio.py
[3] https://bugs.python.org/issue23309

On Mon, May 16, 2022 at 9:41 AM Robert Bradshaw <[email protected]> wrote:

> This is strange, especially the part about 1 vs. 2 map calls making a
> difference. Is the window into and write to filesystem necessary to
> reproduce this as well?
>
> On Sat, May 14, 2022 at 10:25 PM Deepak Nagaraj
> <[email protected]> wrote:
> >
> > Hi Beam users,
> >
> > I'm facing a problem with a Beam Python pipeline. It is running on
> Flink, reading from Kafka in an unbounded way, and I have
> use_deprecated_read flag set. I then have 2 beam.Map() calls, followed by a
> .windowInto() and then a write to a file system.
> >
> > When I send a batch (1000) of small messages (20 bytes), I have no
> problems. However, when I send a batch (1000) of large messages (1
> kilobytes), the pipeline freezes after some time. The exact location
> varies, however, I notice that there is always 10-12 records worth of gap
> in Kafka records sent and the Python step records received. This is
> remarkably consistent. Also, when I cancel the Flink job, I see a set of
> stack traces on the console. [1]
> >
> > A similar Java pipeline works fine, also if I have only one beam.Map()
> call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no
> difference.
> >
> > It seems like we have a problem posting messages from stage to stage
> within Python. I'm wondering if there's a buffer in the Python SDK of about
> 10-12 KB that gets filled up and then blocks the pipeline from progress?
> >
> > Thanks,
> > Deepak
> >
> > [1] Sample Python stack trace,  this is printed automatically when I
> cancel the Beam pipeline job on Flink UI
> >
> > >
> > INFO:__main__:Stopping worker 1-1
> > Fatal Python error: could not acquire lock for <_io.BufferedWriter
> name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
> > Python runtime state: finalizing (tstate=0x7f8849f048e0)
> >
> > Thread 0x000070000e577000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070000f57a000 (most recent call first):
> >   File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27
> in read_kafka_record
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py",
> line 1639 in <lambda>
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 228 in process_encoded
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1002 in process_bundle
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 625 in process_bundle
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 587 in do_instruction
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 347 in <lambda>
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 274 in _execute
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 346 in task
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 37 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 53 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070001a61e000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 433 in acquire
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 57 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700018618000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 1202 in invoke_excepthook
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 934 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700017615000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
> 170 in get
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 581 in _write_outputs
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
> line 203 in consume_request_iterator
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070001460c000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
> 170 in get
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 987 in request_iter
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
> line 203 in consume_request_iterator
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x000070000c571000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 433 in acquire
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
> line 57 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700013586000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 302 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
> 170 in get
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 233 in get_responses
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
> line 203 in consume_request_iterator
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 870 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Thread 0x0000700011580000 (most recent call first):
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 306 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 558 in wait
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
> line 214 in run
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 932 in _bootstrap_inner
> >   File
> "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
> line 890 in _bootstrap
> >
> > Current thread 0x000000010b173600 (most recent call first):
> > <no Python frame>
> > ^C
> >
>

Reply via email to