I confirm the situation gets better after the commit: 4 cores used for 18
seconds rather than one core used for 50 seconds.

I still need to check whether this fixed the original issue which was with
tensorflow_data_validation.

But definitely a step for me in the right direction to understand the issue.

On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <rober...@google.com> wrote:

> This could influence how sources are read. When you say before/after
> the commit, is it better now?
>
> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jlaf...@gmail.com> wrote:
> >
> > I took some time to bisect my issue between v2.16.0 & v2.16.0 and it
> looks like the commit below made a difference.
> >
> > before the commit: execution time is 50 seconds using the fn_api_runner
> in multiprocess mode on 4 workers
> > after the commit: execution time is 18 seconds using the fn_api_runner
> in multiprocess mode on 4 workers
> >
> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new)
> > Author: Robert Bradshaw <rober...@gmail.com>
> > Date:   Fri Oct 18 15:33:10 2019 -0700
> >
> >    Make beam_fn_api opt-out rather than opt-in for runners.
> >
> >    Also delegate this decision to the runner, rather than checking the
> string
> >    value of the flag.
> >
> > I looked at the modifications done in this patch but none stroke me as
> related with my issue.
> >
> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> wrote:
> >>
> >> Hi Hannah,
> >>
> >> I used top.
> >>
> >> Please let me know if you need any other information that cloud help me
> understand the issue.
> >>
> >> J.
> >>
> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com>
> wrote:
> >>>
> >>> Hi Julien
> >>>
> >>> Thanks for reaching out user community. I will look into it. Can you
> please share how you checked CPU usage for each core?
> >>>
> >>> Thanks,
> >>> Hannah
> >>>
> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com>
> wrote:
> >>>>
> >>>> Hello,
> >>>>
> >>>> I have a set of tfrecord files, obtained by converting parquet files
> with Spark. Each file is roughly 1GB and I have 11 of those.
> >>>>
> >>>> I would expect simple statistics gathering (ie counting number of
> items of all files) to scale linearly with respect to the number of cores
> on my system.
> >>>>
> >>>> I am able to reproduce the issue with the minimal snippet below
> >>>>
> >>>> import apache_beam as beam
> >>>> from apache_beam.options.pipeline_options import PipelineOptions
> >>>> from apache_beam.runners.portability import fn_api_runner
> >>>> from apache_beam.portability.api import beam_runner_api_pb2
> >>>> from apache_beam.portability import python_urns
> >>>> import sys
> >>>>
> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> >>>>
> >>>> file_pattern = 'part-r-00*
> >>>> runner=fn_api_runner.FnApiRunner(
> >>>>           default_environment=beam_runner_api_pb2.Environment(
> >>>>               urn=python_urns.SUBPROCESS_SDK,
> >>>>               payload=b'%s -m
> apache_beam.runners.worker.sdk_worker_main'
> >>>>                         % sys.executable.encode('ascii')))
> >>>>
> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options)
> >>>>
> >>>> lines = (p | 'read' >>
> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
> >>>>            | beam.combiners.Count.Globally()
> >>>>            | beam.io.WriteToText('/tmp/output'))
> >>>>
> >>>> p.run()
> >>>>
> >>>> Only one combination of apache_beam revision / worker type seems to
> work (I refer to https://beam.apache.org/documentation/runners/direct/
> for the worker types)
> >>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu
> usage on multiple cores
> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores
> >>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess
> mode fails when trying to serialize the Environment instance most likely
> because of a change from 2.17 to 2.18.
> >>>>
> >>>> I also tried briefly SparkRunner with version 2.16 but was no able to
> achieve any throughput.
> >>>>
> >>>> What is the recommnended way to achieve what I am trying to ? How can
> I troubleshoot ?
> >>>>
> >>> --
> >>> Please help me know how I am doing: go/hannahjiang-feedback
>

Reply via email to