I just tried DirectRunner -- I did not see any problem with it. It does happen with Flink.
Deepak On Mon, May 16, 2022 at 12:57 PM Robert Bradshaw <[email protected]> wrote: > > Is this just on Flink, or does this happen on other runners too? > > On Mon, May 16, 2022 at 12:39 PM Deepak Nagaraj > <[email protected]> wrote: > > > > Hi Robert, > > > > On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <[email protected]> > > wrote: > > > > > > On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj > > > <[email protected]> wrote: > > > > > > > > > > I can imagine contention for an I/O lock, but I'm not sure how that > > > would lead to a deadlock. But hopefully knowing that print() is > > > involved should allow a more minimal reproduction of the issue. > > > > > > > Yes, I've enclosed [1] a minimal pipeline that reproduces the problem > > as well as the last set of logs. Per the logs, the problem occurs even > > with a single worker thread. > > > > Thanks, > > Deepak > > > > [1] Minimal Beam pipeline that stalls due to deadlock: > > > > def _run_pipeline(pipeline): > > def process_data(unused): > > print('a'*1000) > > > > _ = ( > > pipeline > > | "Create" >> beam.Create(['a']*1000) > > | "Process" >> beam.Map(process_data) > > ) > > pipeline.run().wait_until_finish() > > > > [2] Last set of logs from the Python worker pool: > > > > DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90 > > DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads. > > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation > > Process output_tags=['None'], receivers=[ConsumerSet[Process.out0, > > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]> > > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation > > Create/Map(decode) output_tags=['None'], > > receivers=[SingletonConsumerSet[Create/Map(decode).out0, > > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]> > > DEBUG:apache_beam.runners.worker.bundle_processor:start > > <DataInputOperation fn/read/ref_PCollection_PCollection_7:0 > > receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0, > > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
