This does look a lot like FLINK-10672.. maybe leave a comment there.

If you have a simple pipeline you can share that can reproduce the error,
that would be extremely helpful.

On Thu, Apr 30, 2020 at 5:56 PM Robbe Sneyders <[email protected]>
wrote:

> Yes, the task manager has one task slot per CPU core available, and the
> dashboard shows that the work is parallelized across multiple subtasks.
>
> However when using parallelism, the pipeline stalls, the Task Manager
> starts throwing 'Output channel stalled' warnings, and high back pressure
> is created at the Partition step as is shown in the tables below.
>
> The Task Manager should have more than enough memory.
> JVM Heap Size: 30.0 GB
> Flink Managed Memory: 21.0 GB
>
> Any idea what could cause this and how I could resolve it?
>
> Parallelism = 1:
> Name Status Bytes Received Records Received Bytes Sent Records Sent
> Parallelism Start Time Duration End Time Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB 33060 1
> 43951.97782 3m 2s - 1
> Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788 2m 58s - 1
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
> element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 831
> MB 33057 641 MB 32439 1 43951.97788 2m 58s - 1
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641 MB
> 32438 0 B 0 1 43951.97787 2m 58s - 1
>
> Parallelism = 10:
> Name Status Bytes Received Records Received Bytes Sent Records Sent
> Parallelism Start Time Duration End Time Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB 19625
> 10 43951.9834 7m 15s - 19
> Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834 7m 14s - 10
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
> element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 477
> MB 18987 0 B 0 10 43951.98341 7m 14s - 10
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 1.16 KB
> 0 0 B 0 10 43951.9834 7m 14s - 10
>
>  [image: https://ml6.eu] <https://ml6.eu>
>
> Robbe Sneyders
>
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>
>
> On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <[email protected]> wrote:
>
>> If you are using only a single task manager but want to get parallelism >
>> 1, you will need to increase taskmanager.numberOfTaskSlots in
>> your flink-conf.yaml.
>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
>>
>> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <[email protected]>
>> wrote:
>>
>>> Hi Kyle,
>>>
>>> Thanks for the quick response.
>>> The problem was that the pipeline could not access the input file. The
>>> Task Manager errors seem unrelated indeed.
>>>
>>> I'm now able to run the pipeline completely, but I'm running into
>>> problems when using parallelism.
>>> The pipeline can be summarized as:
>>> read file -> shuffle -> process -> write files
>>>
>>> When using parallelism > 1, the pipeline stalls and the Task Manager
>>> outputs following warnings:
>>> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>>>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
>>> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
>>> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
>>> (FlatMap at ExtractOutput[0]) (7/10). See:
>>> https://issues.apache.org/jira/browse/BEAM-4280 for the history for
>>> this issue.
>>>
>>> The referenced issue [1] doesn't contain a lot of information and is
>>> resolved. There is a Flink issue [2] that seems related, although I'm not
>>> seeing the reported stacktrace. I guess this problem occurs since I'm
>>> reading and writing to the same disc in parallel.
>>>
>>> Increasing the Task Manager memory seems to resolve the issue partially.
>>> I'm still getting the stalled channel warnings, but the pipeline does
>>> proceed step-wise but slowly.
>>>
>>> Using BATCH_FORCED execution mode removes the warnings, but still runs a
>>> lot slower than running with parallelism=1.
>>>
>>> The pipeline shouldn't be I/O bounded, so I guess I should still be able
>>> to get some benefit out of running tasks in parallel?
>>>
>>> 1. https://issues.apache.org/jira/browse/BEAM-4280
>>> 2.
>>> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>>>
>>> Kind regards,
>>> Robbe
>>>
>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>
>>> Robbe Sneyders
>>>
>>> ML6 Gent
>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>
>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>
>>>
>>> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <[email protected]> wrote:
>>>
>>>> > This seems to have worked, as the output file is created on the host
>>>> system. However the pipeline silently fails, and the output file remains
>>>> empty.
>>>>
>>>> Have you checked the SDK container logs? They are most likely to
>>>> contain relevant failure information.
>>>>
>>>> > I don't know if this is a result of me rebuilding the Job Server, or
>>>> caused by another issue.
>>>>
>>>> Looks like there is an old but unresolved bug with the same error:
>>>> https://issues.apache.org/jira/browse/BEAM-5397
>>>>
>>>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We're working on a project where we're limited to one big development
>>>>> machine for now. We want to start developing data processing pipelines in
>>>>> Python, which should eventually be ported to a currently unknown setup on 
>>>>> a
>>>>> separate cluster or cloud, so we went with Beam for its portability.
>>>>>
>>>>> For the development setup, we wanted to have the least amount of
>>>>> overhead possible, so we deployed a one node flink cluster with
>>>>> docker-compose. The whole setup is defined by the following
>>>>> docker-compose.yml:
>>>>>
>>>>> ```
>>>>> version: "2.1"
>>>>> services:
>>>>>   flink-jobmanager:
>>>>>     image: flink:1.9
>>>>>     network_mode: host
>>>>>     command: jobmanager
>>>>>     environment:
>>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>>
>>>>>   flink-taskmanager:
>>>>>     image: flink:1.9
>>>>>     network_mode: host
>>>>>     depends_on:
>>>>>       - flink-jobmanager
>>>>>     command: taskmanager
>>>>>     environment:
>>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>>     volumes:
>>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>>       - /usr/bin/docker:/usr/bin/docker
>>>>>       - /var/run/docker.sock:/var/run/docker.sock
>>>>>     user: flink:${DOCKER_GID}
>>>>>
>>>>>   beam-jobserver:
>>>>>     image: apache/beam_flink1.9_job_server:2.20.0
>>>>>     network_mode: host
>>>>>     command: --flink-master=localhost:8081
>>>>>     volumes:
>>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>>
>>>>> volumes:
>>>>>   staging-dir:
>>>>> ```
>>>>>
>>>>> We can submit and run pipelines with the following options:
>>>>> ```
>>>>> 'runner': 'PortableRunner',
>>>>> 'job_endpoint': 'localhost:8099',
>>>>> ```
>>>>> The environment type for the SDK Harness is configured to the default
>>>>> 'docker'.
>>>>>
>>>>> However, we cannot write output files to the host system. To fix this,
>>>>> I tried to mount a host directory to the Beam SDK Container (I had to
>>>>> rebuild the Beam Job Server jar and image to do this). This seems to have
>>>>> worked, as the output file is created on the host system. However the
>>>>> pipeline silently fails, and the output file remains empty. Running the
>>>>> pipeline with DirectRunner confirms that the pipeline is working.
>>>>>
>>>>> Looking at the output logs, the following error is thrown in the Flink
>>>>> Task Manager:
>>>>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>>>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>>>> I don't know if this is a result of me rebuilding the Job Server, or
>>>>> caused by another issue.
>>>>>
>>>>> We currently do not have a distributed file system available. Is there
>>>>> any way to make writing to the host system possible?
>>>>>
>>>>> Kind regards,
>>>>> Robbe
>>>>>
>>>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>>>
>>>>> Robbe Sneyders
>>>>>
>>>>> ML6 Gent
>>>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>>>
>>>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>>>
>>>>

Reply via email to