Yeap, here is the Jira ticket. BEAM-9228 <https://issues.apache.org/jira/browse/BEAM-9228> I just confirmed that the reshuffle operation is not being called at https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926 .
On Fri, Jan 31, 2020 at 9:43 AM Robert Bradshaw <rober...@google.com> wrote: > It's only optionally inserting a reshuffle: > > https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926 > > We should at least have a fusion break (marking the downstream stage > as must follows of the upstream) if reshuffle is not a primitive. We > also clearly need a better test...I'd be happy to consult on either. > Hannah, is there a JIRA for this yet? > > On Fri, Jan 31, 2020 at 8:41 AM Luke Cwik <lc...@google.com> wrote: > > > > Is the DirectRunner inserting a reshuffle or redistribute operation > within the SplittableDoFn transform expansion so it looks like > (PairWithRestriction -> SplitRestriction -> Reshuffle -> > ProcessElementAndRestriction)? > > > > > > On Thu, Jan 30, 2020 at 3:32 PM Hannah Jiang <hannahji...@google.com> > wrote: > >> > >> Hi Julien > >> > >> Here are some more updates about the issue. > >> > >> When we run multiprocessing or multithreading mode with DirectRunner, > workers are created as expected. However, there are issue(s) with > _SDFBoundedSourceWrapper class, so some read transforms send data to a > single worker, instead of distributing across workers. Therefore, when we > check CPU usages, only one subprocess is working and other workers are idle. > >> Unless there are some other transforms that require reshuffle, the > dataset is processed by a single worker, which happened to your pipeline. > >> > >> I tried a workaround, which is rolling back iobase.py not to use > _SDFBoundedSourceWrapper class and keep other changes as it is. With this > change, data is distributed to multiple workers, however, it has some > regressions with SDF wrapper tests, so it didn't work. I created a ticket > for the issue. > >> > >> Thanks for reporting the issue. > >> Hannah > >> > >> > >> On Wed, Jan 29, 2020 at 4:33 PM Hannah Jiang <hannahji...@google.com> > wrote: > >>> > >>> I have investigated some more. > >>> Above commit found by Julien fixed it for 2.17.0, but another commit > broke it from 2.18, 2.19 (which will be released soon) and head. > >>> Root cause haven't been identified, I will keep working on it. > >>> > >>> So at the moment, only 2.17 works as expected. > >>> And from 2.18, environment setting for FnApiRunner was simplied, you > can set the runner with following code. This will be updated at > documentation soon. > >>> ----------------------------------------------------------------------- > >>> > >>> from apache_beam.transforms import environments > >>> command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' % > sys.executable > >>> runner=fn_api_runner.FnApiRunner( > >>> > > default_environment=environments.SubprocessSDKEnvironment(command_string=command_string)) > >>> > >>> ------------------------------------------------- > >>> > >>> > >>> > >>> On Wed, Jan 29, 2020 at 2:09 PM Kyle Weaver <kcwea...@google.com> > wrote: > >>>> > >>>> > I also tried briefly SparkRunner with version 2.16 but was no able > to achieve any throughput. > >>>> > >>>> What do you mean by this? > >>>> > >>>> On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <jlaf...@gmail.com> > wrote: > >>>>> > >>>>> I confirm the situation gets better after the commit: 4 cores used > for 18 seconds rather than one core used for 50 seconds. > >>>>> > >>>>> I still need to check whether this fixed the original issue which > was with tensorflow_data_validation. > >>>>> > >>>>> But definitely a step for me in the right direction to understand > the issue. > >>>>> > >>>>> On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <rober...@google.com> > wrote: > >>>>>> > >>>>>> This could influence how sources are read. When you say before/after > >>>>>> the commit, is it better now? > >>>>>> > >>>>>> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jlaf...@gmail.com> > wrote: > >>>>>> > > >>>>>> > I took some time to bisect my issue between v2.16.0 & v2.16.0 and > it looks like the commit below made a difference. > >>>>>> > > >>>>>> > before the commit: execution time is 50 seconds using the > fn_api_runner in multiprocess mode on 4 workers > >>>>>> > after the commit: execution time is 18 seconds using the > fn_api_runner in multiprocess mode on 4 workers > >>>>>> > > >>>>>> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, > refs/bisect/new) > >>>>>> > Author: Robert Bradshaw <rober...@gmail.com> > >>>>>> > Date: Fri Oct 18 15:33:10 2019 -0700 > >>>>>> > > >>>>>> > Make beam_fn_api opt-out rather than opt-in for runners. > >>>>>> > > >>>>>> > Also delegate this decision to the runner, rather than > checking the string > >>>>>> > value of the flag. > >>>>>> > > >>>>>> > I looked at the modifications done in this patch but none stroke > me as related with my issue. > >>>>>> > > >>>>>> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> > wrote: > >>>>>> >> > >>>>>> >> Hi Hannah, > >>>>>> >> > >>>>>> >> I used top. > >>>>>> >> > >>>>>> >> Please let me know if you need any other information that cloud > help me understand the issue. > >>>>>> >> > >>>>>> >> J. > >>>>>> >> > >>>>>> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang < > hannahji...@google.com> wrote: > >>>>>> >>> > >>>>>> >>> Hi Julien > >>>>>> >>> > >>>>>> >>> Thanks for reaching out user community. I will look into it. > Can you please share how you checked CPU usage for each core? > >>>>>> >>> > >>>>>> >>> Thanks, > >>>>>> >>> Hannah > >>>>>> >>> > >>>>>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye < > jlaf...@gmail.com> wrote: > >>>>>> >>>> > >>>>>> >>>> Hello, > >>>>>> >>>> > >>>>>> >>>> I have a set of tfrecord files, obtained by converting parquet > files with Spark. Each file is roughly 1GB and I have 11 of those. > >>>>>> >>>> > >>>>>> >>>> I would expect simple statistics gathering (ie counting number > of items of all files) to scale linearly with respect to the number of > cores on my system. > >>>>>> >>>> > >>>>>> >>>> I am able to reproduce the issue with the minimal snippet below > >>>>>> >>>> > >>>>>> >>>> import apache_beam as beam > >>>>>> >>>> from apache_beam.options.pipeline_options import > PipelineOptions > >>>>>> >>>> from apache_beam.runners.portability import fn_api_runner > >>>>>> >>>> from apache_beam.portability.api import beam_runner_api_pb2 > >>>>>> >>>> from apache_beam.portability import python_urns > >>>>>> >>>> import sys > >>>>>> >>>> > >>>>>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', > '4']) > >>>>>> >>>> > >>>>>> >>>> file_pattern = 'part-r-00* > >>>>>> >>>> runner=fn_api_runner.FnApiRunner( > >>>>>> >>>> default_environment=beam_runner_api_pb2.Environment( > >>>>>> >>>> urn=python_urns.SUBPROCESS_SDK, > >>>>>> >>>> payload=b'%s -m > apache_beam.runners.worker.sdk_worker_main' > >>>>>> >>>> % sys.executable.encode('ascii'))) > >>>>>> >>>> > >>>>>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options) > >>>>>> >>>> > >>>>>> >>>> lines = (p | 'read' >> > beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > >>>>>> >>>> | beam.combiners.Count.Globally() > >>>>>> >>>> | beam.io.WriteToText('/tmp/output')) > >>>>>> >>>> > >>>>>> >>>> p.run() > >>>>>> >>>> > >>>>>> >>>> Only one combination of apache_beam revision / worker type > seems to work (I refer to > https://beam.apache.org/documentation/runners/direct/ for the worker > types) > >>>>>> >>>> * beam 2.16; neither multithread nor multiprocess achieve high > cpu usage on multiple cores > >>>>>> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores > >>>>>> >>>> * beam 2.18: not tested the mulithreaded mode but the > multiprocess mode fails when trying to serialize the Environment instance > most likely because of a change from 2.17 to 2.18. > >>>>>> >>>> > >>>>>> >>>> I also tried briefly SparkRunner with version 2.16 but was no > able to achieve any throughput. > >>>>>> >>>> > >>>>>> >>>> What is the recommnended way to achieve what I am trying to ? > How can I troubleshoot ? > >>>>>> >>>> > >>>>>> >>> -- > >>>>>> >>> Please help me know how I am doing: go/hannahjiang-feedback > <https://goto.google.com/hannahjiang-feedback> >