This could influence how sources are read. When you say before/after the commit, is it better now?
On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jlaf...@gmail.com> wrote: > > I took some time to bisect my issue between v2.16.0 & v2.16.0 and it looks > like the commit below made a difference. > > before the commit: execution time is 50 seconds using the fn_api_runner in > multiprocess mode on 4 workers > after the commit: execution time is 18 seconds using the fn_api_runner in > multiprocess mode on 4 workers > > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new) > Author: Robert Bradshaw <rober...@gmail.com> > Date: Fri Oct 18 15:33:10 2019 -0700 > > Make beam_fn_api opt-out rather than opt-in for runners. > > Also delegate this decision to the runner, rather than checking the string > value of the flag. > > I looked at the modifications done in this patch but none stroke me as > related with my issue. > > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> wrote: >> >> Hi Hannah, >> >> I used top. >> >> Please let me know if you need any other information that cloud help me >> understand the issue. >> >> J. >> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com> wrote: >>> >>> Hi Julien >>> >>> Thanks for reaching out user community. I will look into it. Can you please >>> share how you checked CPU usage for each core? >>> >>> Thanks, >>> Hannah >>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote: >>>> >>>> Hello, >>>> >>>> I have a set of tfrecord files, obtained by converting parquet files with >>>> Spark. Each file is roughly 1GB and I have 11 of those. >>>> >>>> I would expect simple statistics gathering (ie counting number of items of >>>> all files) to scale linearly with respect to the number of cores on my >>>> system. >>>> >>>> I am able to reproduce the issue with the minimal snippet below >>>> >>>> import apache_beam as beam >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> from apache_beam.runners.portability import fn_api_runner >>>> from apache_beam.portability.api import beam_runner_api_pb2 >>>> from apache_beam.portability import python_urns >>>> import sys >>>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4']) >>>> >>>> file_pattern = 'part-r-00* >>>> runner=fn_api_runner.FnApiRunner( >>>> default_environment=beam_runner_api_pb2.Environment( >>>> urn=python_urns.SUBPROCESS_SDK, >>>> payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' >>>> % sys.executable.encode('ascii'))) >>>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options) >>>> >>>> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >>>> | beam.combiners.Count.Globally() >>>> | beam.io.WriteToText('/tmp/output')) >>>> >>>> p.run() >>>> >>>> Only one combination of apache_beam revision / worker type seems to work >>>> (I refer to https://beam.apache.org/documentation/runners/direct/ for the >>>> worker types) >>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage >>>> on multiple cores >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores >>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode >>>> fails when trying to serialize the Environment instance most likely >>>> because of a change from 2.17 to 2.18. >>>> >>>> I also tried briefly SparkRunner with version 2.16 but was no able to >>>> achieve any throughput. >>>> >>>> What is the recommnended way to achieve what I am trying to ? How can I >>>> troubleshoot ? >>>> >>> -- >>> Please help me know how I am doing: go/hannahjiang-feedback