Yes, the task manager has one task slot per CPU core available, and the
dashboard shows that the work is parallelized across multiple subtasks.

However when using parallelism, the pipeline stalls, the Task Manager
starts throwing 'Output channel stalled' warnings, and high back pressure
is created at the Partition step as is shown in the tables below.

The Task Manager should have more than enough memory.
JVM Heap Size: 30.0 GB
Flink Managed Memory: 21.0 GB

Any idea what could cause this and how I could resolve it?

Parallelism = 1:
Name Status Bytes Received Records Received Bytes Sent Records Sent
Parallelism Start Time Duration End Time Tasks
CHAIN MapPartition (MapPartition at [1]Read
input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
-> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB 33060 1
43951.97782 3m 2s - 1
Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788 2m 58s - 1
CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 831 MB
33057 641 MB 32439 1 43951.97788 2m 58s - 1
CHAIN MapPartition (MapPartition at [3]Write
output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
GroupCombine (GroupCombine at GroupCombine: Write
output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641 MB
32438 0 B 0 1 43951.97787 2m 58s - 1

Parallelism = 10:
Name Status Bytes Received Records Received Bytes Sent Records Sent
Parallelism Start Time Duration End Time Tasks
CHAIN MapPartition (MapPartition at [1]Read
input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
-> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB 19625 10
43951.9834 7m 15s - 19
Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834 7m 14s - 10
CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 477 MB
18987 0 B 0 10 43951.98341 7m 14s - 10
CHAIN MapPartition (MapPartition at [3]Write
output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
GroupCombine (GroupCombine at GroupCombine: Write
output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 1.16 KB 0 0
B 0 10 43951.9834 7m 14s - 10

 [image: https://ml6.eu] <https://ml6.eu>

Robbe Sneyders

ML6 Gent
<https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>

M: +32 474 71 31 08


On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <[email protected]> wrote:

> If you are using only a single task manager but want to get parallelism >
> 1, you will need to increase taskmanager.numberOfTaskSlots in
> your flink-conf.yaml.
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
>
> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <[email protected]>
> wrote:
>
>> Hi Kyle,
>>
>> Thanks for the quick response.
>> The problem was that the pipeline could not access the input file. The
>> Task Manager errors seem unrelated indeed.
>>
>> I'm now able to run the pipeline completely, but I'm running into
>> problems when using parallelism.
>> The pipeline can be summarized as:
>> read file -> shuffle -> process -> write files
>>
>> When using parallelism > 1, the pipeline stalls and the Task Manager
>> outputs following warnings:
>> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
>> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
>> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
>> (FlatMap at ExtractOutput[0]) (7/10). See:
>> https://issues.apache.org/jira/browse/BEAM-4280 for the history for this
>> issue.
>>
>> The referenced issue [1] doesn't contain a lot of information and is
>> resolved. There is a Flink issue [2] that seems related, although I'm not
>> seeing the reported stacktrace. I guess this problem occurs since I'm
>> reading and writing to the same disc in parallel.
>>
>> Increasing the Task Manager memory seems to resolve the issue partially.
>> I'm still getting the stalled channel warnings, but the pipeline does
>> proceed step-wise but slowly.
>>
>> Using BATCH_FORCED execution mode removes the warnings, but still runs a
>> lot slower than running with parallelism=1.
>>
>> The pipeline shouldn't be I/O bounded, so I guess I should still be able
>> to get some benefit out of running tasks in parallel?
>>
>> 1. https://issues.apache.org/jira/browse/BEAM-4280
>> 2.
>> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>>
>> Kind regards,
>> Robbe
>>
>>  [image: https://ml6.eu] <https://ml6.eu>
>>
>> Robbe Sneyders
>>
>> ML6 Gent
>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>
>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>
>>
>> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <[email protected]> wrote:
>>
>>> > This seems to have worked, as the output file is created on the host
>>> system. However the pipeline silently fails, and the output file remains
>>> empty.
>>>
>>> Have you checked the SDK container logs? They are most likely to contain
>>> relevant failure information.
>>>
>>> > I don't know if this is a result of me rebuilding the Job Server, or
>>> caused by another issue.
>>>
>>> Looks like there is an old but unresolved bug with the same error:
>>> https://issues.apache.org/jira/browse/BEAM-5397
>>>
>>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <[email protected]>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We're working on a project where we're limited to one big development
>>>> machine for now. We want to start developing data processing pipelines in
>>>> Python, which should eventually be ported to a currently unknown setup on a
>>>> separate cluster or cloud, so we went with Beam for its portability.
>>>>
>>>> For the development setup, we wanted to have the least amount of
>>>> overhead possible, so we deployed a one node flink cluster with
>>>> docker-compose. The whole setup is defined by the following
>>>> docker-compose.yml:
>>>>
>>>> ```
>>>> version: "2.1"
>>>> services:
>>>>   flink-jobmanager:
>>>>     image: flink:1.9
>>>>     network_mode: host
>>>>     command: jobmanager
>>>>     environment:
>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>
>>>>   flink-taskmanager:
>>>>     image: flink:1.9
>>>>     network_mode: host
>>>>     depends_on:
>>>>       - flink-jobmanager
>>>>     command: taskmanager
>>>>     environment:
>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>     volumes:
>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>       - /usr/bin/docker:/usr/bin/docker
>>>>       - /var/run/docker.sock:/var/run/docker.sock
>>>>     user: flink:${DOCKER_GID}
>>>>
>>>>   beam-jobserver:
>>>>     image: apache/beam_flink1.9_job_server:2.20.0
>>>>     network_mode: host
>>>>     command: --flink-master=localhost:8081
>>>>     volumes:
>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>
>>>> volumes:
>>>>   staging-dir:
>>>> ```
>>>>
>>>> We can submit and run pipelines with the following options:
>>>> ```
>>>> 'runner': 'PortableRunner',
>>>> 'job_endpoint': 'localhost:8099',
>>>> ```
>>>> The environment type for the SDK Harness is configured to the default
>>>> 'docker'.
>>>>
>>>> However, we cannot write output files to the host system. To fix this,
>>>> I tried to mount a host directory to the Beam SDK Container (I had to
>>>> rebuild the Beam Job Server jar and image to do this). This seems to have
>>>> worked, as the output file is created on the host system. However the
>>>> pipeline silently fails, and the output file remains empty. Running the
>>>> pipeline with DirectRunner confirms that the pipeline is working.
>>>>
>>>> Looking at the output logs, the following error is thrown in the Flink
>>>> Task Manager:
>>>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>>> I don't know if this is a result of me rebuilding the Job Server, or
>>>> caused by another issue.
>>>>
>>>> We currently do not have a distributed file system available. Is there
>>>> any way to make writing to the host system possible?
>>>>
>>>> Kind regards,
>>>> Robbe
>>>>
>>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>>
>>>> Robbe Sneyders
>>>>
>>>> ML6 Gent
>>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>>
>>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>>
>>>

Reply via email to