> I also tried briefly SparkRunner with version 2.16 but was no able to achieve any throughput.
What do you mean by this? On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <[email protected]> wrote: > I confirm the situation gets better after the commit: 4 cores used for 18 > seconds rather than one core used for 50 seconds. > > I still need to check whether this fixed the original issue which was with > tensorflow_data_validation. > > But definitely a step for me in the right direction to understand the > issue. > > On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <[email protected]> > wrote: > >> This could influence how sources are read. When you say before/after >> the commit, is it better now? >> >> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <[email protected]> wrote: >> > >> > I took some time to bisect my issue between v2.16.0 & v2.16.0 and it >> looks like the commit below made a difference. >> > >> > before the commit: execution time is 50 seconds using the fn_api_runner >> in multiprocess mode on 4 workers >> > after the commit: execution time is 18 seconds using the fn_api_runner >> in multiprocess mode on 4 workers >> > >> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD, refs/bisect/new) >> > Author: Robert Bradshaw <[email protected]> >> > Date: Fri Oct 18 15:33:10 2019 -0700 >> > >> > Make beam_fn_api opt-out rather than opt-in for runners. >> > >> > Also delegate this decision to the runner, rather than checking the >> string >> > value of the flag. >> > >> > I looked at the modifications done in this patch but none stroke me as >> related with my issue. >> > >> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <[email protected]> >> wrote: >> >> >> >> Hi Hannah, >> >> >> >> I used top. >> >> >> >> Please let me know if you need any other information that cloud help >> me understand the issue. >> >> >> >> J. >> >> >> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <[email protected]> >> wrote: >> >>> >> >>> Hi Julien >> >>> >> >>> Thanks for reaching out user community. I will look into it. Can you >> please share how you checked CPU usage for each core? >> >>> >> >>> Thanks, >> >>> Hannah >> >>> >> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <[email protected]> >> wrote: >> >>>> >> >>>> Hello, >> >>>> >> >>>> I have a set of tfrecord files, obtained by converting parquet files >> with Spark. Each file is roughly 1GB and I have 11 of those. >> >>>> >> >>>> I would expect simple statistics gathering (ie counting number of >> items of all files) to scale linearly with respect to the number of cores >> on my system. >> >>>> >> >>>> I am able to reproduce the issue with the minimal snippet below >> >>>> >> >>>> import apache_beam as beam >> >>>> from apache_beam.options.pipeline_options import PipelineOptions >> >>>> from apache_beam.runners.portability import fn_api_runner >> >>>> from apache_beam.portability.api import beam_runner_api_pb2 >> >>>> from apache_beam.portability import python_urns >> >>>> import sys >> >>>> >> >>>> pipeline_options = PipelineOptions(['--direct_num_workers', '4']) >> >>>> >> >>>> file_pattern = 'part-r-00* >> >>>> runner=fn_api_runner.FnApiRunner( >> >>>> default_environment=beam_runner_api_pb2.Environment( >> >>>> urn=python_urns.SUBPROCESS_SDK, >> >>>> payload=b'%s -m >> apache_beam.runners.worker.sdk_worker_main' >> >>>> % sys.executable.encode('ascii'))) >> >>>> >> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options) >> >>>> >> >>>> lines = (p | 'read' >> >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >> >>>> | beam.combiners.Count.Globally() >> >>>> | beam.io.WriteToText('/tmp/output')) >> >>>> >> >>>> p.run() >> >>>> >> >>>> Only one combination of apache_beam revision / worker type seems to >> work (I refer to https://beam.apache.org/documentation/runners/direct/ >> for the worker types) >> >>>> * beam 2.16; neither multithread nor multiprocess achieve high cpu >> usage on multiple cores >> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores >> >>>> * beam 2.18: not tested the mulithreaded mode but the multiprocess >> mode fails when trying to serialize the Environment instance most likely >> because of a change from 2.17 to 2.18. >> >>>> >> >>>> I also tried briefly SparkRunner with version 2.16 but was no able >> to achieve any throughput. >> >>>> >> >>>> What is the recommnended way to achieve what I am trying to ? How >> can I troubleshoot ? >> >>>> >> >>> -- >> >>> Please help me know how I am doing: go/hannahjiang-feedback >> <https://goto.google.com/hannahjiang-feedback> >> >
