Hey Robbe,

The issue with a higher parallelism is likely due to the single Python
process which processes the data.

You may want to use the `sdk_worker_parallelism` pipeline option which
brings up multiple worker Python workers.

Best,
Max

On 30.04.20 23:56, Robbe Sneyders wrote:
> Yes, the task manager has one task slot per CPU core available, and the
> dashboard shows that the work is parallelized across multiple subtasks.
> 
> However when using parallelism, the pipeline stalls, the Task Manager
> starts throwing 'Output channel stalled' warnings, and high back
> pressure is created at the Partition step as is shown in the tables below.
> 
> The Task Manager should have more than enough memory.
> JVM Heap Size: 30.0 GB
> Flink Managed Memory: 21.0 GB
> 
> Any idea what could cause this and how I could resolve it?
> 
> Parallelism = 1:
> Name  Status  Bytes Received  Records Received        Bytes Sent      Records 
> Sent
> Parallelism   Start Time      Duration        End Time        Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0])      RUNNING 43.0 KB 52      831 MB
> 33060 1       43951.97782     3m 2s   -       1
> Partition     RUNNING 831 MB  33059   831 MB  33059   1       43951.97788     
> 2m 58s  -       1
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING       831 MB  33057   641 MB  32439   1       43951.97788     2m 58s  
> -       1
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor)     RUNNING 641
> MB    32438   0 B     0       1       43951.97787     2m 58s  -       1
> 
> 
> Parallelism = 10:
> Name  Status  Bytes Received  Records Received        Bytes Sent      Records 
> Sent
> Parallelism   Start Time      Duration        End Time        Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0])      RUNNING 43.1 KB 52      493 MB
> 19625 10      43951.9834      7m 15s  -       19
> Partition     RUNNING 486 MB  19363   486 MB  19363   10      43951.9834      
> 7m 14s  -       10
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING       477 MB  18987   0 B     0       10      43951.98341     7m 14s  
> -       10
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor)     RUNNING
> 1.16 KB       0       0 B     0       10      43951.9834      7m 14s  -       
> 10
> 
> 
>  https://ml6.eu <https://ml6.eu> 
> 
>       
>       
> 
> Robbe Sneyders
> 
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>  
> 
> M: +32 474 71 31 08
> 
> 
> 
> On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <kcwea...@google.com
> <mailto:kcwea...@google.com>> wrote:
> 
>     If you are using only a single task manager but want to get
>     parallelism > 1, you will need to
>     increase taskmanager.numberOfTaskSlots in
>     your flink-conf.yaml. 
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
> 
>     On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders
>     <robbe.sneyd...@ml6.eu <mailto:robbe.sneyd...@ml6.eu>> wrote:
> 
>         Hi Kyle,
> 
>         Thanks for the quick response.
>         The problem was that the pipeline could not access the input
>         file. The Task Manager errors seem unrelated indeed.
> 
>         I'm now able to run the pipeline completely, but I'm running
>         into problems when using parallelism. 
>         The pipeline can be summarized as:
>         read file -> shuffle -> process -> write files
> 
>         When using parallelism > 1, the pipeline stalls and the Task
>         Manager outputs following warnings:
>         flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>          org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output
>         channel stalled for 255s, outbound thread CHAIN MapPartition
>         (MapPartition at [4]{Discard array, Load json, Process element,
>         Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) (7/10).
>         See: https://issues.apache.org/jira/browse/BEAM-4280 for the
>         history for this issue.
> 
>         The referenced issue [1] doesn't contain a lot of information
>         and is resolved. There is a Flink issue [2] that seems related,
>         although I'm not seeing the reported stacktrace. I guess this
>         problem occurs since I'm reading and writing to the same disc in
>         parallel.
> 
>         Increasing the Task Manager memory seems to resolve the issue
>         partially. I'm still getting the stalled channel warnings, but
>         the pipeline does proceed step-wise but slowly.
> 
>         Using BATCH_FORCED execution mode removes the warnings, but
>         still runs a lot slower than running with parallelism=1.
> 
>         The pipeline shouldn't be I/O bounded, so I guess I should still
>         be able to get some benefit out of running tasks in parallel?
> 
>         1. https://issues.apache.org/jira/browse/BEAM-4280
>         2. 
> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
> 
>         Kind regards,
>         Robbe
> 
>          https://ml6.eu <https://ml6.eu> 
> 
>               
>               
> 
>         Robbe Sneyders
> 
>         ML6 Gent
>         
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>  
> 
>         M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008>
> 
> 
> 
>         On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kcwea...@google.com
>         <mailto:kcwea...@google.com>> wrote:
> 
>             > This seems to have worked, as the output file is created
>             on the host system. However the pipeline silently fails, and
>             the output file remains empty.
> 
>             Have you checked the SDK container logs? They are most
>             likely to contain relevant failure information.
> 
>             > I don't know if this is a result of me rebuilding the Job
>             Server, or caused by another issue.
> 
>             Looks like there is an old but unresolved bug with the same
>             error: https://issues.apache.org/jira/browse/BEAM-5397
> 
>             On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders
>             <robbe.sneyd...@ml6.eu <mailto:robbe.sneyd...@ml6.eu>> wrote:
> 
>                 Hi all,
> 
>                 We're working on a project where we're limited to one
>                 big development machine for now. We want to start
>                 developing data processing pipelines in Python, which
>                 should eventually be ported to a currently unknown setup
>                 on a separate cluster or cloud, so we went with Beam for
>                 its portability.
> 
>                 For the development setup, we wanted to have the least
>                 amount of overhead possible, so we deployed a one node
>                 flink cluster with docker-compose. The whole setup is
>                 defined by the following docker-compose.yml:
> 
>                 ```
>                 version: "2.1"
>                 services:
>                   flink-jobmanager:
>                     image: flink:1.9
>                     network_mode: host
>                     command: jobmanager
>                     environment:
>                       - JOB_MANAGER_RPC_ADDRESS=localhost
> 
>                   flink-taskmanager:
>                     image: flink:1.9
>                     network_mode: host
>                     depends_on:
>                       - flink-jobmanager
>                     command: taskmanager
>                     environment:
>                       - JOB_MANAGER_RPC_ADDRESS=localhost
>                     volumes:
>                       - staging-dir:/tmp/beam-artifact-staging
>                       - /usr/bin/docker:/usr/bin/docker
>                       - /var/run/docker.sock:/var/run/docker.sock
>                     user: flink:${DOCKER_GID}
> 
>                   beam-jobserver:
>                     image: apache/beam_flink1.9_job_server:2.20.0
>                     network_mode: host
>                     command: --flink-master=localhost:8081
>                     volumes:
>                       - staging-dir:/tmp/beam-artifact-staging
> 
>                 volumes:
>                   staging-dir:
>                 ```
> 
>                 We can submit and run pipelines with the following options:
>                 ```
>                 'runner': 'PortableRunner',
>                 'job_endpoint': 'localhost:8099',
>                 ```
>                 The environment type for the SDK Harness is configured
>                 to the default 'docker'.
> 
>                 However, we cannot write output files to the host
>                 system. To fix this, I tried to mount a host directory
>                 to the Beam SDK Container (I had to rebuild the Beam Job
>                 Server jar and image to do this). This seems to have
>                 worked, as the output file is created on the host
>                 system. However the pipeline silently fails, and the
>                 output file remains empty. Running the pipeline with
>                 DirectRunner confirms that the pipeline is working.
> 
>                 Looking at the output logs, the following error is
>                 thrown in the Flink Task Manager:
>                 flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>                 
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 
>                 I don't know if this is a result of me rebuilding the
>                 Job Server, or caused by another issue.
> 
>                 We currently do not have a distributed file system
>                 available. Is there any way to make writing to the host
>                 system possible?
> 
>                 Kind regards,
>                 Robbe
> 
>                  https://ml6.eu <https://ml6.eu> 
> 
>                       
>                       
> 
>                 Robbe Sneyders
> 
>                 ML6 Gent
>                 
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>  
> 
>                 M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008>
> 

Reply via email to