This issue will be fixed from v2.20. PR: https://github.com/apache/beam/pull/10847
On Fri, Jan 31, 2020 at 9:52 AM Hannah Jiang <[email protected]> wrote: > Yeap, here is the Jira ticket. BEAM-9228 > <https://issues.apache.org/jira/browse/BEAM-9228> > I just confirmed that the reshuffle operation is not being called at > https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926 > . > > > On Fri, Jan 31, 2020 at 9:43 AM Robert Bradshaw <[email protected]> > wrote: > >> It's only optionally inserting a reshuffle: >> >> https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926 >> >> We should at least have a fusion break (marking the downstream stage >> as must follows of the upstream) if reshuffle is not a primitive. We >> also clearly need a better test...I'd be happy to consult on either. >> Hannah, is there a JIRA for this yet? >> >> On Fri, Jan 31, 2020 at 8:41 AM Luke Cwik <[email protected]> wrote: >> > >> > Is the DirectRunner inserting a reshuffle or redistribute operation >> within the SplittableDoFn transform expansion so it looks like >> (PairWithRestriction -> SplitRestriction -> Reshuffle -> >> ProcessElementAndRestriction)? >> > >> > >> > On Thu, Jan 30, 2020 at 3:32 PM Hannah Jiang <[email protected]> >> wrote: >> >> >> >> Hi Julien >> >> >> >> Here are some more updates about the issue. >> >> >> >> When we run multiprocessing or multithreading mode with DirectRunner, >> workers are created as expected. However, there are issue(s) with >> _SDFBoundedSourceWrapper class, so some read transforms send data to a >> single worker, instead of distributing across workers. Therefore, when we >> check CPU usages, only one subprocess is working and other workers are idle. >> >> Unless there are some other transforms that require reshuffle, the >> dataset is processed by a single worker, which happened to your pipeline. >> >> >> >> I tried a workaround, which is rolling back iobase.py not to use >> _SDFBoundedSourceWrapper class and keep other changes as it is. With this >> change, data is distributed to multiple workers, however, it has some >> regressions with SDF wrapper tests, so it didn't work. I created a ticket >> for the issue. >> >> >> >> Thanks for reporting the issue. >> >> Hannah >> >> >> >> >> >> On Wed, Jan 29, 2020 at 4:33 PM Hannah Jiang <[email protected]> >> wrote: >> >>> >> >>> I have investigated some more. >> >>> Above commit found by Julien fixed it for 2.17.0, but another commit >> broke it from 2.18, 2.19 (which will be released soon) and head. >> >>> Root cause haven't been identified, I will keep working on it. >> >>> >> >>> So at the moment, only 2.17 works as expected. >> >>> And from 2.18, environment setting for FnApiRunner was simplied, you >> can set the runner with following code. This will be updated at >> documentation soon. >> >>> >> ----------------------------------------------------------------------- >> >>> >> >>> from apache_beam.transforms import environments >> >>> command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' % >> sys.executable >> >>> runner=fn_api_runner.FnApiRunner( >> >>> >> >> default_environment=environments.SubprocessSDKEnvironment(command_string=command_string)) >> >>> >> >>> ------------------------------------------------- >> >>> >> >>> >> >>> >> >>> On Wed, Jan 29, 2020 at 2:09 PM Kyle Weaver <[email protected]> >> wrote: >> >>>> >> >>>> > I also tried briefly SparkRunner with version 2.16 but was no able >> to achieve any throughput. >> >>>> >> >>>> What do you mean by this? >> >>>> >> >>>> On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <[email protected]> >> wrote: >> >>>>> >> >>>>> I confirm the situation gets better after the commit: 4 cores used >> for 18 seconds rather than one core used for 50 seconds. >> >>>>> >> >>>>> I still need to check whether this fixed the original issue which >> was with tensorflow_data_validation. >> >>>>> >> >>>>> But definitely a step for me in the right direction to understand >> the issue. >> >>>>> >> >>>>> On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw < >> [email protected]> wrote: >> >>>>>> >> >>>>>> This could influence how sources are read. When you say >> before/after >> >>>>>> the commit, is it better now? >> >>>>>> >> >>>>>> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <[email protected]> >> wrote: >> >>>>>> > >> >>>>>> > I took some time to bisect my issue between v2.16.0 & v2.16.0 >> and it looks like the commit below made a difference. >> >>>>>> > >> >>>>>> > before the commit: execution time is 50 seconds using the >> fn_api_runner in multiprocess mode on 4 workers >> >>>>>> > after the commit: execution time is 18 seconds using the >> fn_api_runner in multiprocess mode on 4 workers >> >>>>>> > >> >>>>>> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, >> refs/bisect/new) >> >>>>>> > Author: Robert Bradshaw <[email protected]> >> >>>>>> > Date: Fri Oct 18 15:33:10 2019 -0700 >> >>>>>> > >> >>>>>> > Make beam_fn_api opt-out rather than opt-in for runners. >> >>>>>> > >> >>>>>> > Also delegate this decision to the runner, rather than >> checking the string >> >>>>>> > value of the flag. >> >>>>>> > >> >>>>>> > I looked at the modifications done in this patch but none stroke >> me as related with my issue. >> >>>>>> > >> >>>>>> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <[email protected]> >> wrote: >> >>>>>> >> >> >>>>>> >> Hi Hannah, >> >>>>>> >> >> >>>>>> >> I used top. >> >>>>>> >> >> >>>>>> >> Please let me know if you need any other information that cloud >> help me understand the issue. >> >>>>>> >> >> >>>>>> >> J. >> >>>>>> >> >> >>>>>> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang < >> [email protected]> wrote: >> >>>>>> >>> >> >>>>>> >>> Hi Julien >> >>>>>> >>> >> >>>>>> >>> Thanks for reaching out user community. I will look into it. >> Can you please share how you checked CPU usage for each core? >> >>>>>> >>> >> >>>>>> >>> Thanks, >> >>>>>> >>> Hannah >> >>>>>> >>> >> >>>>>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye < >> [email protected]> wrote: >> >>>>>> >>>> >> >>>>>> >>>> Hello, >> >>>>>> >>>> >> >>>>>> >>>> I have a set of tfrecord files, obtained by converting >> parquet files with Spark. Each file is roughly 1GB and I have 11 of those. >> >>>>>> >>>> >> >>>>>> >>>> I would expect simple statistics gathering (ie counting >> number of items of all files) to scale linearly with respect to the number >> of cores on my system. >> >>>>>> >>>> >> >>>>>> >>>> I am able to reproduce the issue with the minimal snippet >> below >> >>>>>> >>>> >> >>>>>> >>>> import apache_beam as beam >> >>>>>> >>>> from apache_beam.options.pipeline_options import >> PipelineOptions >> >>>>>> >>>> from apache_beam.runners.portability import fn_api_runner >> >>>>>> >>>> from apache_beam.portability.api import beam_runner_api_pb2 >> >>>>>> >>>> from apache_beam.portability import python_urns >> >>>>>> >>>> import sys >> >>>>>> >>>> >> >>>>>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', >> '4']) >> >>>>>> >>>> >> >>>>>> >>>> file_pattern = 'part-r-00* >> >>>>>> >>>> runner=fn_api_runner.FnApiRunner( >> >>>>>> >>>> default_environment=beam_runner_api_pb2.Environment( >> >>>>>> >>>> urn=python_urns.SUBPROCESS_SDK, >> >>>>>> >>>> payload=b'%s -m >> apache_beam.runners.worker.sdk_worker_main' >> >>>>>> >>>> % sys.executable.encode('ascii'))) >> >>>>>> >>>> >> >>>>>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options) >> >>>>>> >>>> >> >>>>>> >>>> lines = (p | 'read' >> >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >> >>>>>> >>>> | beam.combiners.Count.Globally() >> >>>>>> >>>> | beam.io.WriteToText('/tmp/output')) >> >>>>>> >>>> >> >>>>>> >>>> p.run() >> >>>>>> >>>> >> >>>>>> >>>> Only one combination of apache_beam revision / worker type >> seems to work (I refer to >> https://beam.apache.org/documentation/runners/direct/ for the worker >> types) >> >>>>>> >>>> * beam 2.16; neither multithread nor multiprocess achieve >> high cpu usage on multiple cores >> >>>>>> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores >> >>>>>> >>>> * beam 2.18: not tested the mulithreaded mode but the >> multiprocess mode fails when trying to serialize the Environment instance >> most likely because of a change from 2.17 to 2.18. >> >>>>>> >>>> >> >>>>>> >>>> I also tried briefly SparkRunner with version 2.16 but was no >> able to achieve any throughput. >> >>>>>> >>>> >> >>>>>> >>>> What is the recommnended way to achieve what I am trying to ? >> How can I troubleshoot ? >> >>>>>> >>>> >> >>>>>> >>> -- >> >>>>>> >>> Please help me know how I am doing: go/hannahjiang-feedback >> <https://goto.google.com/hannahjiang-feedback> >> >
