This could influence how sources are read. When you say before/after
the commit, is it better now?

On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jlaf...@gmail.com> wrote:
>
> I took some time to bisect my issue between v2.16.0 & v2.16.0 and it looks 
> like the commit below made a difference.
>
> before the commit: execution time is 50 seconds using the fn_api_runner in 
> multiprocess mode on 4 workers
> after the commit: execution time is 18 seconds using the fn_api_runner in 
> multiprocess mode on 4 workers
>
> commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new)
> Author: Robert Bradshaw <rober...@gmail.com>
> Date:   Fri Oct 18 15:33:10 2019 -0700
>
>    Make beam_fn_api opt-out rather than opt-in for runners.
>
>    Also delegate this decision to the runner, rather than checking the string
>    value of the flag.
>
> I looked at the modifications done in this patch but none stroke me as 
> related with my issue.
>
> On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> wrote:
>>
>> Hi Hannah,
>>
>> I used top.
>>
>> Please let me know if you need any other information that cloud help me 
>> understand the issue.
>>
>> J.
>>
>> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com> wrote:
>>>
>>> Hi Julien
>>>
>>> Thanks for reaching out user community. I will look into it. Can you please 
>>> share how you checked CPU usage for each core?
>>>
>>> Thanks,
>>> Hannah
>>>
>>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I have a set of tfrecord files, obtained by converting parquet files with 
>>>> Spark. Each file is roughly 1GB and I have 11 of those.
>>>>
>>>> I would expect simple statistics gathering (ie counting number of items of 
>>>> all files) to scale linearly with respect to the number of cores on my 
>>>> system.
>>>>
>>>> I am able to reproduce the issue with the minimal snippet below
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from apache_beam.runners.portability import fn_api_runner
>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>> from apache_beam.portability import python_urns
>>>> import sys
>>>>
>>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
>>>>
>>>> file_pattern = 'part-r-00*
>>>> runner=fn_api_runner.FnApiRunner(
>>>>           default_environment=beam_runner_api_pb2.Environment(
>>>>               urn=python_urns.SUBPROCESS_SDK,
>>>>               payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
>>>>                         % sys.executable.encode('ascii')))
>>>>
>>>> p = beam.Pipeline(runner=runner, options=pipeline_options)
>>>>
>>>> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>>>>            | beam.combiners.Count.Globally()
>>>>            | beam.io.WriteToText('/tmp/output'))
>>>>
>>>> p.run()
>>>>
>>>> Only one combination of apache_beam revision / worker type seems to work 
>>>> (I refer to https://beam.apache.org/documentation/runners/direct/ for the 
>>>> worker types)
>>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage 
>>>> on multiple cores
>>>> * beam 2.17: able to achieve high cpu usage on all 4 cores
>>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode 
>>>> fails when trying to serialize the Environment instance most likely 
>>>> because of a change from 2.17 to 2.18.
>>>>
>>>> I also tried briefly SparkRunner with version 2.16 but was no able to 
>>>> achieve any throughput.
>>>>
>>>> What is the recommnended way to achieve what I am trying to ? How can I 
>>>> troubleshoot ?
>>>>
>>> --
>>> Please help me know how I am doing: go/hannahjiang-feedback

Reply via email to