Hi Julien

Thanks for reaching out user community. I will look into it. Can you please
share how you checked CPU usage for each core?

Thanks,
Hannah

On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote:

> Hello,
>
> I have a set of tfrecord files, obtained by converting parquet files with
> Spark. Each file is roughly 1GB and I have 11 of those.
>
> I would expect simple statistics gathering (ie counting number of items of
> all files) to scale linearly with respect to the number of cores on my
> system.
>
> I am able to reproduce the issue with the minimal snippet below
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
>
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
>
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>           default_environment=beam_runner_api_pb2.Environment(
>               urn=python_urns.SUBPROCESS_SDK,
>               payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
>                         % sys.executable.encode('ascii')))
>
> p = beam.Pipeline(runner=runner, options=pipeline_options)
>
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>            | beam.combiners.Count.Globally()
>            | beam.io.WriteToText('/tmp/output'))
>
> p.run()
>
> Only one combination of apache_beam revision / worker type seems to work
> (I refer to https://beam.apache.org/documentation/runners/direct/ for the
> worker types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage
> on multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode
> fails when trying to serialize the Environment instance most likely because
> of a change from 2.17 to 2.18.
>
> I also tried briefly SparkRunner with version 2.16 but was no able to
> achieve any throughput.
>
> What is the recommnended way to achieve what I am trying to ? How can I
> troubleshoot ?
>
> --
Please help me know how I am doing: go/hannahjiang-feedback

Reply via email to