I confirm the situation gets better after the commit: 4 cores used for 18 seconds rather than one core used for 50 seconds.
I still need to check whether this fixed the original issue which was with tensorflow_data_validation. But definitely a step for me in the right direction to understand the issue. On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <rober...@google.com> wrote: > This could influence how sources are read. When you say before/after > the commit, is it better now? > > On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jlaf...@gmail.com> wrote: > > > > I took some time to bisect my issue between v2.16.0 & v2.16.0 and it > looks like the commit below made a difference. > > > > before the commit: execution time is 50 seconds using the fn_api_runner > in multiprocess mode on 4 workers > > after the commit: execution time is 18 seconds using the fn_api_runner > in multiprocess mode on 4 workers > > > > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new) > > Author: Robert Bradshaw <rober...@gmail.com> > > Date: Fri Oct 18 15:33:10 2019 -0700 > > > > Make beam_fn_api opt-out rather than opt-in for runners. > > > > Also delegate this decision to the runner, rather than checking the > string > > value of the flag. > > > > I looked at the modifications done in this patch but none stroke me as > related with my issue. > > > > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> wrote: > >> > >> Hi Hannah, > >> > >> I used top. > >> > >> Please let me know if you need any other information that cloud help me > understand the issue. > >> > >> J. > >> > >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com> > wrote: > >>> > >>> Hi Julien > >>> > >>> Thanks for reaching out user community. I will look into it. Can you > please share how you checked CPU usage for each core? > >>> > >>> Thanks, > >>> Hannah > >>> > >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> > wrote: > >>>> > >>>> Hello, > >>>> > >>>> I have a set of tfrecord files, obtained by converting parquet files > with Spark. Each file is roughly 1GB and I have 11 of those. > >>>> > >>>> I would expect simple statistics gathering (ie counting number of > items of all files) to scale linearly with respect to the number of cores > on my system. > >>>> > >>>> I am able to reproduce the issue with the minimal snippet below > >>>> > >>>> import apache_beam as beam > >>>> from apache_beam.options.pipeline_options import PipelineOptions > >>>> from apache_beam.runners.portability import fn_api_runner > >>>> from apache_beam.portability.api import beam_runner_api_pb2 > >>>> from apache_beam.portability import python_urns > >>>> import sys > >>>> > >>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > >>>> > >>>> file_pattern = 'part-r-00* > >>>> runner=fn_api_runner.FnApiRunner( > >>>> default_environment=beam_runner_api_pb2.Environment( > >>>> urn=python_urns.SUBPROCESS_SDK, > >>>> payload=b'%s -m > apache_beam.runners.worker.sdk_worker_main' > >>>> % sys.executable.encode('ascii'))) > >>>> > >>>> p = beam.Pipeline(runner=runner, options=pipeline_options) > >>>> > >>>> lines = (p | 'read' >> > beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > >>>> | beam.combiners.Count.Globally() > >>>> | beam.io.WriteToText('/tmp/output')) > >>>> > >>>> p.run() > >>>> > >>>> Only one combination of apache_beam revision / worker type seems to > work (I refer to https://beam.apache.org/documentation/runners/direct/ > for the worker types) > >>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu > usage on multiple cores > >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores > >>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess > mode fails when trying to serialize the Environment instance most likely > because of a change from 2.17 to 2.18. > >>>> > >>>> I also tried briefly SparkRunner with version 2.16 but was no able to > achieve any throughput. > >>>> > >>>> What is the recommnended way to achieve what I am trying to ? How can > I troubleshoot ? > >>>> > >>> -- > >>> Please help me know how I am doing: go/hannahjiang-feedback >