Hi Kyle,

Thanks for the quick response.
The problem was that the pipeline could not access the input file. The Task
Manager errors seem unrelated indeed.

I'm now able to run the pipeline completely, but I'm running into problems
when using parallelism.
The pipeline can be summarized as:
read file -> shuffle -> process -> write files

When using parallelism > 1, the pipeline stalls and the Task Manager
outputs following warnings:
flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
 org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
[4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
(FlatMap at ExtractOutput[0]) (7/10). See:
https://issues.apache.org/jira/browse/BEAM-4280 for the history for this
issue.

The referenced issue [1] doesn't contain a lot of information and is
resolved. There is a Flink issue [2] that seems related, although I'm not
seeing the reported stacktrace. I guess this problem occurs since I'm
reading and writing to the same disc in parallel.

Increasing the Task Manager memory seems to resolve the issue partially.
I'm still getting the stalled channel warnings, but the pipeline does
proceed step-wise but slowly.

Using BATCH_FORCED execution mode removes the warnings, but still runs a
lot slower than running with parallelism=1.

The pipeline shouldn't be I/O bounded, so I guess I should still be able to
get some benefit out of running tasks in parallel?

1. https://issues.apache.org/jira/browse/BEAM-4280
2.
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692

Kind regards,
Robbe

 [image: https://ml6.eu] <https://ml6.eu>

Robbe Sneyders

ML6 Gent
<https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>

M: +32 474 71 31 08


On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <[email protected]> wrote:

> > This seems to have worked, as the output file is created on the host
> system. However the pipeline silently fails, and the output file remains
> empty.
>
> Have you checked the SDK container logs? They are most likely to contain
> relevant failure information.
>
> > I don't know if this is a result of me rebuilding the Job Server, or
> caused by another issue.
>
> Looks like there is an old but unresolved bug with the same error:
> https://issues.apache.org/jira/browse/BEAM-5397
>
> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <[email protected]>
> wrote:
>
>> Hi all,
>>
>> We're working on a project where we're limited to one big development
>> machine for now. We want to start developing data processing pipelines in
>> Python, which should eventually be ported to a currently unknown setup on a
>> separate cluster or cloud, so we went with Beam for its portability.
>>
>> For the development setup, we wanted to have the least amount of overhead
>> possible, so we deployed a one node flink cluster with docker-compose. The
>> whole setup is defined by the following docker-compose.yml:
>>
>> ```
>> version: "2.1"
>> services:
>>   flink-jobmanager:
>>     image: flink:1.9
>>     network_mode: host
>>     command: jobmanager
>>     environment:
>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>
>>   flink-taskmanager:
>>     image: flink:1.9
>>     network_mode: host
>>     depends_on:
>>       - flink-jobmanager
>>     command: taskmanager
>>     environment:
>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>     volumes:
>>       - staging-dir:/tmp/beam-artifact-staging
>>       - /usr/bin/docker:/usr/bin/docker
>>       - /var/run/docker.sock:/var/run/docker.sock
>>     user: flink:${DOCKER_GID}
>>
>>   beam-jobserver:
>>     image: apache/beam_flink1.9_job_server:2.20.0
>>     network_mode: host
>>     command: --flink-master=localhost:8081
>>     volumes:
>>       - staging-dir:/tmp/beam-artifact-staging
>>
>> volumes:
>>   staging-dir:
>> ```
>>
>> We can submit and run pipelines with the following options:
>> ```
>> 'runner': 'PortableRunner',
>> 'job_endpoint': 'localhost:8099',
>> ```
>> The environment type for the SDK Harness is configured to the default
>> 'docker'.
>>
>> However, we cannot write output files to the host system. To fix this,
>> I tried to mount a host directory to the Beam SDK Container (I had to
>> rebuild the Beam Job Server jar and image to do this). This seems to have
>> worked, as the output file is created on the host system. However the
>> pipeline silently fails, and the output file remains empty. Running the
>> pipeline with DirectRunner confirms that the pipeline is working.
>>
>> Looking at the output logs, the following error is thrown in the Flink
>> Task Manager:
>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>> I don't know if this is a result of me rebuilding the Job Server, or
>> caused by another issue.
>>
>> We currently do not have a distributed file system available. Is there
>> any way to make writing to the host system possible?
>>
>> Kind regards,
>> Robbe
>>
>>  [image: https://ml6.eu] <https://ml6.eu>
>>
>> Robbe Sneyders
>>
>> ML6 Gent
>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>
>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>
>

Reply via email to