[ 
https://issues.apache.org/jira/browse/BEAM-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannah Jiang closed BEAM-9228.
------------------------------

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> --------------------------------------------------------------------
>
>                 Key: BEAM-9228
>                 URL: https://issues.apache.org/jira/browse/BEAM-9228
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.16.0, 2.18.0, 2.19.0
>            Reporter: Hannah Jiang
>            Assignee: Hannah Jiang
>            Priority: Major
>             Fix For: 2.20.0
>
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -------------------------------------------------
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>           default_environment=beam_runner_api_pb2.Environment(
>               urn=python_urns.SUBPROCESS_SDK,
>               payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
>                         % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>                  | beam.combiners.Count.Globally()
>                  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> ----------------------------------------------------------------------------------------------------------------------------------------------
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to