Hi Hannah, I used top.
Please let me know if you need any other information that cloud help me understand the issue. J. On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <hannahji...@google.com> wrote: > Hi Julien > > Thanks for reaching out user community. I will look into it. Can you > please share how you checked CPU usage for each core? > > Thanks, > Hannah > > On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote: > >> Hello, >> >> I have a set of tfrecord files, obtained by converting parquet files with >> Spark. Each file is roughly 1GB and I have 11 of those. >> >> I would expect simple statistics gathering (ie counting number of items >> of all files) to scale linearly with respect to the number of cores on my >> system. >> >> I am able to reproduce the issue with the minimal snippet below >> >> import apache_beam as beam >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.runners.portability import fn_api_runner >> from apache_beam.portability.api import beam_runner_api_pb2 >> from apache_beam.portability import python_urns >> import sys >> >> pipeline_options = PipelineOptions(['--direct_num_workers', '4']) >> >> file_pattern = 'part-r-00* >> runner=fn_api_runner.FnApiRunner( >> default_environment=beam_runner_api_pb2.Environment( >> urn=python_urns.SUBPROCESS_SDK, >> payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' >> % sys.executable.encode('ascii'))) >> >> p = beam.Pipeline(runner=runner, options=pipeline_options) >> >> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >> | beam.combiners.Count.Globally() >> | beam.io.WriteToText('/tmp/output')) >> >> p.run() >> >> Only one combination of apache_beam revision / worker type seems to work >> (I refer to https://beam.apache.org/documentation/runners/direct/ for >> the worker types) >> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage >> on multiple cores >> * beam 2.17: able to achieve high cpu usage on all 4 cores >> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode >> fails when trying to serialize the Environment instance most likely because >> of a change from 2.17 to 2.18. >> >> I also tried briefly SparkRunner with version 2.16 but was no able to >> achieve any throughput. >> >> What is the recommnended way to achieve what I am trying to ? How can I >> troubleshoot ? >> >> -- > Please help me know how I am doing: go/hannahjiang-feedback >