> I also tried briefly SparkRunner with version 2.16 but was no able to
achieve any throughput.

What do you mean by this?

On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <[email protected]> wrote:

> I confirm the situation gets better after the commit: 4 cores used for 18
> seconds rather than one core used for 50 seconds.
>
> I still need to check whether this fixed the original issue which was with
> tensorflow_data_validation.
>
> But definitely a step for me in the right direction to understand the
> issue.
>
> On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> This could influence how sources are read. When you say before/after
>> the commit, is it better now?
>>
>> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <[email protected]> wrote:
>> >
>> > I took some time to bisect my issue between v2.16.0 & v2.16.0 and it
>> looks like the commit below made a difference.
>> >
>> > before the commit: execution time is 50 seconds using the fn_api_runner
>> in multiprocess mode on 4 workers
>> > after the commit: execution time is 18 seconds using the fn_api_runner
>> in multiprocess mode on 4 workers
>> >
>> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new)
>> > Author: Robert Bradshaw <[email protected]>
>> > Date:   Fri Oct 18 15:33:10 2019 -0700
>> >
>> >    Make beam_fn_api opt-out rather than opt-in for runners.
>> >
>> >    Also delegate this decision to the runner, rather than checking the
>> string
>> >    value of the flag.
>> >
>> > I looked at the modifications done in this patch but none stroke me as
>> related with my issue.
>> >
>> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <[email protected]>
>> wrote:
>> >>
>> >> Hi Hannah,
>> >>
>> >> I used top.
>> >>
>> >> Please let me know if you need any other information that cloud help
>> me understand the issue.
>> >>
>> >> J.
>> >>
>> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <[email protected]>
>> wrote:
>> >>>
>> >>> Hi Julien
>> >>>
>> >>> Thanks for reaching out user community. I will look into it. Can you
>> please share how you checked CPU usage for each core?
>> >>>
>> >>> Thanks,
>> >>> Hannah
>> >>>
>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <[email protected]>
>> wrote:
>> >>>>
>> >>>> Hello,
>> >>>>
>> >>>> I have a set of tfrecord files, obtained by converting parquet files
>> with Spark. Each file is roughly 1GB and I have 11 of those.
>> >>>>
>> >>>> I would expect simple statistics gathering (ie counting number of
>> items of all files) to scale linearly with respect to the number of cores
>> on my system.
>> >>>>
>> >>>> I am able to reproduce the issue with the minimal snippet below
>> >>>>
>> >>>> import apache_beam as beam
>> >>>> from apache_beam.options.pipeline_options import PipelineOptions
>> >>>> from apache_beam.runners.portability import fn_api_runner
>> >>>> from apache_beam.portability.api import beam_runner_api_pb2
>> >>>> from apache_beam.portability import python_urns
>> >>>> import sys
>> >>>>
>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
>> >>>>
>> >>>> file_pattern = 'part-r-00*
>> >>>> runner=fn_api_runner.FnApiRunner(
>> >>>>           default_environment=beam_runner_api_pb2.Environment(
>> >>>>               urn=python_urns.SUBPROCESS_SDK,
>> >>>>               payload=b'%s -m
>> apache_beam.runners.worker.sdk_worker_main'
>> >>>>                         % sys.executable.encode('ascii')))
>> >>>>
>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options)
>> >>>>
>> >>>> lines = (p | 'read' >>
>> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>> >>>>            | beam.combiners.Count.Globally()
>> >>>>            | beam.io.WriteToText('/tmp/output'))
>> >>>>
>> >>>> p.run()
>> >>>>
>> >>>> Only one combination of apache_beam revision / worker type seems to
>> work (I refer to https://beam.apache.org/documentation/runners/direct/
>> for the worker types)
>> >>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu
>> usage on multiple cores
>> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores
>> >>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess
>> mode fails when trying to serialize the Environment instance most likely
>> because of a change from 2.17 to 2.18.
>> >>>>
>> >>>> I also tried briefly SparkRunner with version 2.16 but was no able
>> to achieve any throughput.
>> >>>>
>> >>>> What is the recommnended way to achieve what I am trying to ? How
>> can I troubleshoot ?
>> >>>>
>> >>> --
>> >>> Please help me know how I am doing: go/hannahjiang-feedback
>> <https://goto.google.com/hannahjiang-feedback>
>>
>

Reply via email to