I took some time to bisect my issue between v2.16.0 & v2.16.0 and it looks like the commit below made a difference.
before the commit: execution time is 50 seconds using the fn_api_runner in multiprocess mode on 4 workers after the commit: execution time is 18 seconds using the fn_api_runner in multiprocess mode on 4 workers commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new) Author: Robert Bradshaw <rober...@gmail.com> Date: Fri Oct 18 15:33:10 2019 -0700 Make beam_fn_api opt-out rather than opt-in for runners. Also delegate this decision to the runner, rather than checking the string value of the flag. I looked at the modifications done in this patch but none stroke me as related with my issue. On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jlaf...@gmail.com> wrote: > Hi Hannah, > > I used top. > > Please let me know if you need any other information that cloud help me > understand the issue. > > J. > > On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com> > wrote: > >> Hi Julien >> >> Thanks for reaching out user community. I will look into it. Can you >> please share how you checked CPU usage for each core? >> >> Thanks, >> Hannah >> >> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote: >> >>> Hello, >>> >>> I have a set of tfrecord files, obtained by converting parquet files >>> with Spark. Each file is roughly 1GB and I have 11 of those. >>> >>> I would expect simple statistics gathering (ie counting number of items >>> of all files) to scale linearly with respect to the number of cores on my >>> system. >>> >>> I am able to reproduce the issue with the minimal snippet below >>> >>> import apache_beam as beam >>> from apache_beam.options.pipeline_options import PipelineOptions >>> from apache_beam.runners.portability import fn_api_runner >>> from apache_beam.portability.api import beam_runner_api_pb2 >>> from apache_beam.portability import python_urns >>> import sys >>> >>> pipeline_options = PipelineOptions(['--direct_num_workers', '4']) >>> >>> file_pattern = 'part-r-00* >>> runner=fn_api_runner.FnApiRunner( >>> default_environment=beam_runner_api_pb2.Environment( >>> urn=python_urns.SUBPROCESS_SDK, >>> payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' >>> % sys.executable.encode('ascii'))) >>> >>> p = beam.Pipeline(runner=runner, options=pipeline_options) >>> >>> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >>> | beam.combiners.Count.Globally() >>> | beam.io.WriteToText('/tmp/output')) >>> >>> p.run() >>> >>> Only one combination of apache_beam revision / worker type seems to work >>> (I refer to https://beam.apache.org/documentation/runners/direct/ for >>> the worker types) >>> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage >>> on multiple cores >>> * beam 2.17: able to achieve high cpu usage on all 4 cores >>> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode >>> fails when trying to serialize the Environment instance most likely because >>> of a change from 2.17 to 2.18. >>> >>> I also tried briefly SparkRunner with version 2.16 but was no able to >>> achieve any throughput. >>> >>> What is the recommnended way to achieve what I am trying to ? How can I >>> troubleshoot ? >>> >>> -- >> Please help me know how I am doing: go/hannahjiang-feedback >> >