Hi Julien Here are some more updates about the issue.
When we run multiprocessing or multithreading mode with DirectRunner, workers are created as expected. However, there are issue(s) with *_SDFBoundedSourceWrapper* class, so some read transforms send data to a single worker, instead of distributing across workers. Therefore, when we check CPU usages, only one subprocess is working and other workers are idle. Unless there are some other transforms that require reshuffle, the dataset is processed by a single worker, which happened to your pipeline. I tried a workaround <https://github.com/apache/beam/pull/10729>, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper class and keep other changes as it is. With this change, data is distributed to multiple workers, however, it has some regressions with SDF wrapper tests, so it didn't work. I created a ticket <https://issues.apache.org/jira/browse/BEAM-9228> for the issue. Thanks for reporting the issue. Hannah On Wed, Jan 29, 2020 at 4:33 PM Hannah Jiang <[email protected]> wrote: > I have investigated some more. > Above commit found by Julien fixed it for 2.17.0, but another commit > <https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60> > broke it from 2.18, 2.19 (which will be released soon) and head. > Root cause haven't been identified, I will keep working on it. > > So at the moment, only 2.17 works as expected. > And from 2.18, environment setting for FnApiRunner was simplied, you can > set the runner with following code. This will be updated at documentation > soon. > ----------------------------------------------------------------------- > > from apache_beam.transforms import environments > command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' % > sys.executable > runner=fn_api_runner.FnApiRunner( > > default_environment=environments.SubprocessSDKEnvironment(command_string=command_string)) > > ------------------------------------------------- > > > > On Wed, Jan 29, 2020 at 2:09 PM Kyle Weaver <[email protected]> wrote: > >> > I also tried briefly SparkRunner with version 2.16 but was no able to >> achieve any throughput. >> >> What do you mean by this? >> >> On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <[email protected]> wrote: >> >>> I confirm the situation gets better after the commit: 4 cores used for >>> 18 seconds rather than one core used for 50 seconds. >>> >>> I still need to check whether this fixed the original issue which was >>> with tensorflow_data_validation. >>> >>> But definitely a step for me in the right direction to understand the >>> issue. >>> >>> On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> This could influence how sources are read. When you say before/after >>>> the commit, is it better now? >>>> >>>> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <[email protected]> >>>> wrote: >>>> > >>>> > I took some time to bisect my issue between v2.16.0 & v2.16.0 and it >>>> looks like the commit below made a difference. >>>> > >>>> > before the commit: execution time is 50 seconds using the >>>> fn_api_runner in multiprocess mode on 4 workers >>>> > after the commit: execution time is 18 seconds using the >>>> fn_api_runner in multiprocess mode on 4 workers >>>> > >>>> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, >>>> refs/bisect/new) >>>> > Author: Robert Bradshaw <[email protected]> >>>> > Date: Fri Oct 18 15:33:10 2019 -0700 >>>> > >>>> > Make beam_fn_api opt-out rather than opt-in for runners. >>>> > >>>> > Also delegate this decision to the runner, rather than checking >>>> the string >>>> > value of the flag. >>>> > >>>> > I looked at the modifications done in this patch but none stroke me >>>> as related with my issue. >>>> > >>>> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <[email protected]> >>>> wrote: >>>> >> >>>> >> Hi Hannah, >>>> >> >>>> >> I used top. >>>> >> >>>> >> Please let me know if you need any other information that cloud help >>>> me understand the issue. >>>> >> >>>> >> J. >>>> >> >>>> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <[email protected]> >>>> wrote: >>>> >>> >>>> >>> Hi Julien >>>> >>> >>>> >>> Thanks for reaching out user community. I will look into it. Can >>>> you please share how you checked CPU usage for each core? >>>> >>> >>>> >>> Thanks, >>>> >>> Hannah >>>> >>> >>>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <[email protected]> >>>> wrote: >>>> >>>> >>>> >>>> Hello, >>>> >>>> >>>> >>>> I have a set of tfrecord files, obtained by converting parquet >>>> files with Spark. Each file is roughly 1GB and I have 11 of those. >>>> >>>> >>>> >>>> I would expect simple statistics gathering (ie counting number of >>>> items of all files) to scale linearly with respect to the number of cores >>>> on my system. >>>> >>>> >>>> >>>> I am able to reproduce the issue with the minimal snippet below >>>> >>>> >>>> >>>> import apache_beam as beam >>>> >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> >>>> from apache_beam.runners.portability import fn_api_runner >>>> >>>> from apache_beam.portability.api import beam_runner_api_pb2 >>>> >>>> from apache_beam.portability import python_urns >>>> >>>> import sys >>>> >>>> >>>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4']) >>>> >>>> >>>> >>>> file_pattern = 'part-r-00* >>>> >>>> runner=fn_api_runner.FnApiRunner( >>>> >>>> default_environment=beam_runner_api_pb2.Environment( >>>> >>>> urn=python_urns.SUBPROCESS_SDK, >>>> >>>> payload=b'%s -m >>>> apache_beam.runners.worker.sdk_worker_main' >>>> >>>> % sys.executable.encode('ascii'))) >>>> >>>> >>>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options) >>>> >>>> >>>> >>>> lines = (p | 'read' >> >>>> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >>>> >>>> | beam.combiners.Count.Globally() >>>> >>>> | beam.io.WriteToText('/tmp/output')) >>>> >>>> >>>> >>>> p.run() >>>> >>>> >>>> >>>> Only one combination of apache_beam revision / worker type seems >>>> to work (I refer to >>>> https://beam.apache.org/documentation/runners/direct/ for the worker >>>> types) >>>> >>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu >>>> usage on multiple cores >>>> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores >>>> >>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess >>>> mode fails when trying to serialize the Environment instance most likely >>>> because of a change from 2.17 to 2.18. >>>> >>>> >>>> >>>> I also tried briefly SparkRunner with version 2.16 but was no able >>>> to achieve any throughput. >>>> >>>> >>>> >>>> What is the recommnended way to achieve what I am trying to ? How >>>> can I troubleshoot ? >>>> >>>> >>>> >>> -- >>>> >>> Please help me know how I am doing: go/hannahjiang-feedback >>>> <https://goto.google.com/hannahjiang-feedback> >>>> >>>
