Yeap, here is the Jira ticket. BEAM-9228
<https://issues.apache.org/jira/browse/BEAM-9228>
I just confirmed that the reshuffle operation is not being called at
https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926
.


On Fri, Jan 31, 2020 at 9:43 AM Robert Bradshaw <rober...@google.com> wrote:

> It's only optionally inserting a reshuffle:
>
> https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926
>
> We should at least have a fusion break (marking the downstream stage
> as must follows of the upstream) if reshuffle is not a primitive. We
> also clearly need a better test...I'd be happy to consult on either.
> Hannah, is there a JIRA for this yet?
>
> On Fri, Jan 31, 2020 at 8:41 AM Luke Cwik <lc...@google.com> wrote:
> >
> > Is the DirectRunner inserting a reshuffle or redistribute operation
> within the SplittableDoFn transform expansion so it looks like
> (PairWithRestriction -> SplitRestriction -> Reshuffle ->
> ProcessElementAndRestriction)?
> >
> >
> > On Thu, Jan 30, 2020 at 3:32 PM Hannah Jiang <hannahji...@google.com>
> wrote:
> >>
> >> Hi Julien
> >>
> >> Here are some more updates about the issue.
> >>
> >> When we run multiprocessing or multithreading mode with DirectRunner,
> workers are created as expected. However, there are issue(s) with
> _SDFBoundedSourceWrapper class, so some read transforms send data to a
> single worker, instead of distributing across workers. Therefore, when we
> check CPU usages, only one subprocess is working and other workers are idle.
> >> Unless there are some other transforms that require reshuffle, the
> dataset is processed by a single worker, which happened to your pipeline.
> >>
> >> I tried a workaround, which is rolling back iobase.py not to use
> _SDFBoundedSourceWrapper class and keep other changes as it is. With this
> change, data is distributed to multiple workers, however, it has some
> regressions with SDF wrapper tests, so it didn't work. I created a ticket
> for the issue.
> >>
> >> Thanks for reporting the issue.
> >> Hannah
> >>
> >>
> >> On Wed, Jan 29, 2020 at 4:33 PM Hannah Jiang <hannahji...@google.com>
> wrote:
> >>>
> >>> I have investigated some more.
> >>> Above commit found by Julien fixed it for 2.17.0, but another commit
> broke it from 2.18, 2.19 (which will be released soon) and head.
> >>> Root cause haven't been identified, I will keep working on it.
> >>>
> >>> So at the moment, only 2.17 works as expected.
> >>> And from 2.18, environment setting for FnApiRunner was simplied, you
> can set the runner with following code. This will be updated at
> documentation soon.
> >>> -----------------------------------------------------------------------
> >>>
> >>> from apache_beam.transforms import environments
> >>> command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' %
> sys.executable
> >>> runner=fn_api_runner.FnApiRunner(
> >>>
>  
> default_environment=environments.SubprocessSDKEnvironment(command_string=command_string))
> >>>
> >>> -------------------------------------------------
> >>>
> >>>
> >>>
> >>> On Wed, Jan 29, 2020 at 2:09 PM Kyle Weaver <kcwea...@google.com>
> wrote:
> >>>>
> >>>> > I also tried briefly SparkRunner with version 2.16 but was no able
> to achieve any throughput.
> >>>>
> >>>> What do you mean by this?
> >>>>
> >>>> On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <jlaf...@gmail.com>
> wrote:
> >>>>>
> >>>>> I confirm the situation gets better after the commit: 4 cores used
> for 18 seconds rather than one core used for 50 seconds.
> >>>>>
> >>>>> I still need to check whether this fixed the original issue which
> was with tensorflow_data_validation.
> >>>>>
> >>>>> But definitely a step for me in the right direction to understand
> the issue.
> >>>>>
> >>>>> On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>>>>
> >>>>>> This could influence how sources are read. When you say before/after
> >>>>>> the commit, is it better now?
> >>>>>>
> >>>>>> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jlaf...@gmail.com>
> wrote:
> >>>>>> >
> >>>>>> > I took some time to bisect my issue between v2.16.0 & v2.16.0 and
> it looks like the commit below made a difference.
> >>>>>> >
> >>>>>> > before the commit: execution time is 50 seconds using the
> fn_api_runner in multiprocess mode on 4 workers
> >>>>>> > after the commit: execution time is 18 seconds using the
> fn_api_runner in multiprocess mode on 4 workers
> >>>>>> >
> >>>>>> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD,
> refs/bisect/new)
> >>>>>> > Author: Robert Bradshaw <rober...@gmail.com>
> >>>>>> > Date:   Fri Oct 18 15:33:10 2019 -0700
> >>>>>> >
> >>>>>> >    Make beam_fn_api opt-out rather than opt-in for runners.
> >>>>>> >
> >>>>>> >    Also delegate this decision to the runner, rather than
> checking the string
> >>>>>> >    value of the flag.
> >>>>>> >
> >>>>>> > I looked at the modifications done in this patch but none stroke
> me as related with my issue.
> >>>>>> >
> >>>>>> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com>
> wrote:
> >>>>>> >>
> >>>>>> >> Hi Hannah,
> >>>>>> >>
> >>>>>> >> I used top.
> >>>>>> >>
> >>>>>> >> Please let me know if you need any other information that cloud
> help me understand the issue.
> >>>>>> >>
> >>>>>> >> J.
> >>>>>> >>
> >>>>>> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <
> hannahji...@google.com> wrote:
> >>>>>> >>>
> >>>>>> >>> Hi Julien
> >>>>>> >>>
> >>>>>> >>> Thanks for reaching out user community. I will look into it.
> Can you please share how you checked CPU usage for each core?
> >>>>>> >>>
> >>>>>> >>> Thanks,
> >>>>>> >>> Hannah
> >>>>>> >>>
> >>>>>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <
> jlaf...@gmail.com> wrote:
> >>>>>> >>>>
> >>>>>> >>>> Hello,
> >>>>>> >>>>
> >>>>>> >>>> I have a set of tfrecord files, obtained by converting parquet
> files with Spark. Each file is roughly 1GB and I have 11 of those.
> >>>>>> >>>>
> >>>>>> >>>> I would expect simple statistics gathering (ie counting number
> of items of all files) to scale linearly with respect to the number of
> cores on my system.
> >>>>>> >>>>
> >>>>>> >>>> I am able to reproduce the issue with the minimal snippet below
> >>>>>> >>>>
> >>>>>> >>>> import apache_beam as beam
> >>>>>> >>>> from apache_beam.options.pipeline_options import
> PipelineOptions
> >>>>>> >>>> from apache_beam.runners.portability import fn_api_runner
> >>>>>> >>>> from apache_beam.portability.api import beam_runner_api_pb2
> >>>>>> >>>> from apache_beam.portability import python_urns
> >>>>>> >>>> import sys
> >>>>>> >>>>
> >>>>>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers',
> '4'])
> >>>>>> >>>>
> >>>>>> >>>> file_pattern = 'part-r-00*
> >>>>>> >>>> runner=fn_api_runner.FnApiRunner(
> >>>>>> >>>>           default_environment=beam_runner_api_pb2.Environment(
> >>>>>> >>>>               urn=python_urns.SUBPROCESS_SDK,
> >>>>>> >>>>               payload=b'%s -m
> apache_beam.runners.worker.sdk_worker_main'
> >>>>>> >>>>                         % sys.executable.encode('ascii')))
> >>>>>> >>>>
> >>>>>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options)
> >>>>>> >>>>
> >>>>>> >>>> lines = (p | 'read' >>
> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
> >>>>>> >>>>            | beam.combiners.Count.Globally()
> >>>>>> >>>>            | beam.io.WriteToText('/tmp/output'))
> >>>>>> >>>>
> >>>>>> >>>> p.run()
> >>>>>> >>>>
> >>>>>> >>>> Only one combination of apache_beam revision / worker type
> seems to work (I refer to
> https://beam.apache.org/documentation/runners/direct/ for the worker
> types)
> >>>>>> >>>> * beam 2.16; neither multithread nor multiprocess achieve high
> cpu usage on multiple cores
> >>>>>> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores
> >>>>>> >>>> * beam 2.18: not tested the mulithreaded mode but the
> multiprocess mode fails when trying to serialize the Environment instance
> most likely because of a change from 2.17 to 2.18.
> >>>>>> >>>>
> >>>>>> >>>> I also tried briefly SparkRunner with version 2.16 but was no
> able to achieve any throughput.
> >>>>>> >>>>
> >>>>>> >>>> What is the recommnended way to achieve what I am trying to ?
> How can I troubleshoot ?
> >>>>>> >>>>
> >>>>>> >>> --
> >>>>>> >>> Please help me know how I am doing: go/hannahjiang-feedback
> <https://goto.google.com/hannahjiang-feedback>
>

Reply via email to