I just tried DirectRunner -- I did not see any problem with it.  It
does happen with Flink.

Deepak

On Mon, May 16, 2022 at 12:57 PM Robert Bradshaw <[email protected]> wrote:
>
> Is this just on Flink, or does this happen on other runners too?
>
> On Mon, May 16, 2022 at 12:39 PM Deepak Nagaraj
> <[email protected]> wrote:
> >
> > Hi Robert,
> >
> > On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <[email protected]> 
> > wrote:
> > >
> > > On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> > > <[email protected]> wrote:
> > > >
> > >
> > > I can imagine contention for an I/O lock, but I'm not sure how that
> > > would lead to a deadlock. But hopefully knowing that print() is
> > > involved should allow a more minimal reproduction of the issue.
> > >
> >
> > Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
> > as well as the last set of logs. Per the logs, the problem occurs even
> > with a single worker thread.
> >
> > Thanks,
> > Deepak
> >
> > [1] Minimal Beam pipeline that stalls due to deadlock:
> >
> > def _run_pipeline(pipeline):
> >     def process_data(unused):
> >         print('a'*1000)
> >
> >     _ = (
> >             pipeline
> >             | "Create" >> beam.Create(['a']*1000)
> >             | "Process" >> beam.Map(process_data)
> >     )
> >     pipeline.run().wait_until_finish()
> >
> > [2] Last set of logs from the Python worker pool:
> >
> > DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
> > DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
> > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
> > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> > DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
> > Create/Map(decode) output_tags=['None'],
> > receivers=[SingletonConsumerSet[Create/Map(decode).out0,
> > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> > DEBUG:apache_beam.runners.worker.bundle_processor:start
> > <DataInputOperation fn/read/ref_PCollection_PCollection_7:0
> > receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
> > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Reply via email to