I took some time to bisect my issue between v2.16.0 & v2.16.0 and it looks
like the commit below made a difference.

before the commit: execution time is 50 seconds using the fn_api_runner in
multiprocess mode on 4 workers
after the commit: execution time is 18 seconds using the fn_api_runner in
multiprocess mode on 4 workers

commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new)
Author: Robert Bradshaw <rober...@gmail.com>
Date:   Fri Oct 18 15:33:10 2019 -0700

   Make beam_fn_api opt-out rather than opt-in for runners.

   Also delegate this decision to the runner, rather than checking the
string
   value of the flag.

I looked at the modifications done in this patch but none stroke me as
related with my issue.

On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> wrote:

> Hi Hannah,
>
> I used top.
>
> Please let me know if you need any other information that cloud help me
> understand the issue.
>
> J.
>
> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com>
> wrote:
>
>> Hi Julien
>>
>> Thanks for reaching out user community. I will look into it. Can you
>> please share how you checked CPU usage for each core?
>>
>> Thanks,
>> Hannah
>>
>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a set of tfrecord files, obtained by converting parquet files
>>> with Spark. Each file is roughly 1GB and I have 11 of those.
>>>
>>> I would expect simple statistics gathering (ie counting number of items
>>> of all files) to scale linearly with respect to the number of cores on my
>>> system.
>>>
>>> I am able to reproduce the issue with the minimal snippet below
>>>
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.runners.portability import fn_api_runner
>>> from apache_beam.portability.api import beam_runner_api_pb2
>>> from apache_beam.portability import python_urns
>>> import sys
>>>
>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
>>>
>>> file_pattern = 'part-r-00*
>>> runner=fn_api_runner.FnApiRunner(
>>>           default_environment=beam_runner_api_pb2.Environment(
>>>               urn=python_urns.SUBPROCESS_SDK,
>>>               payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
>>>                         % sys.executable.encode('ascii')))
>>>
>>> p = beam.Pipeline(runner=runner, options=pipeline_options)
>>>
>>> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>>>            | beam.combiners.Count.Globally()
>>>            | beam.io.WriteToText('/tmp/output'))
>>>
>>> p.run()
>>>
>>> Only one combination of apache_beam revision / worker type seems to work
>>> (I refer to https://beam.apache.org/documentation/runners/direct/ for
>>> the worker types)
>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage
>>> on multiple cores
>>> * beam 2.17: able to achieve high cpu usage on all 4 cores
>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode
>>> fails when trying to serialize the Environment instance most likely because
>>> of a change from 2.17 to 2.18.
>>>
>>> I also tried briefly SparkRunner with version 2.16 but was no able to
>>> achieve any throughput.
>>>
>>> What is the recommnended way to achieve what I am trying to ? How can I
>>> troubleshoot ?
>>>
>>> --
>> Please help me know how I am doing: go/hannahjiang-feedback
>>
>

Reply via email to