This does look a lot like FLINK-10672.. maybe leave a comment there. If you have a simple pipeline you can share that can reproduce the error, that would be extremely helpful.
On Thu, Apr 30, 2020 at 5:56 PM Robbe Sneyders <[email protected]> wrote: > Yes, the task manager has one task slot per CPU core available, and the > dashboard shows that the work is parallelized across multiple subtasks. > > However when using parallelism, the pipeline stalls, the Task Manager > starts throwing 'Output channel stalled' warnings, and high back pressure > is created at the Partition step as is shown in the tables below. > > The Task Manager should have more than enough memory. > JVM Heap Size: 30.0 GB > Flink Managed Memory: 21.0 GB > > Any idea what could cause this and how I could resolve it? > > Parallelism = 1: > Name Status Bytes Received Records Received Bytes Sent Records Sent > Parallelism Start Time Duration End Time Tasks > CHAIN MapPartition (MapPartition at [1]Read > input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0) > -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB 33060 1 > 43951.97782 3m 2s - 1 > Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788 2m 58s - 1 > CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process > element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 831 > MB 33057 641 MB 32439 1 43951.97788 2m 58s - 1 > CHAIN MapPartition (MapPartition at [3]Write > output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) -> > FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) -> > GroupCombine (GroupCombine at GroupCombine: Write > output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641 MB > 32438 0 B 0 1 43951.97787 2m 58s - 1 > > Parallelism = 10: > Name Status Bytes Received Records Received Bytes Sent Records Sent > Parallelism Start Time Duration End Time Tasks > CHAIN MapPartition (MapPartition at [1]Read > input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0) > -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB 19625 > 10 43951.9834 7m 15s - 19 > Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834 7m 14s - 10 > CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process > element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 477 > MB 18987 0 B 0 10 43951.98341 7m 14s - 10 > CHAIN MapPartition (MapPartition at [3]Write > output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) -> > FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) -> > GroupCombine (GroupCombine at GroupCombine: Write > output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 1.16 KB > 0 0 B 0 10 43951.9834 7m 14s - 10 > > [image: https://ml6.eu] <https://ml6.eu> > > Robbe Sneyders > > ML6 Gent > <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> > > M: +32 474 71 31 08 <+32%20474%2071%2031%2008> > > > On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <[email protected]> wrote: > >> If you are using only a single task manager but want to get parallelism > >> 1, you will need to increase taskmanager.numberOfTaskSlots in >> your flink-conf.yaml. >> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling >> >> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <[email protected]> >> wrote: >> >>> Hi Kyle, >>> >>> Thanks for the quick response. >>> The problem was that the pipeline could not access the input file. The >>> Task Manager errors seem unrelated indeed. >>> >>> I'm now able to run the pipeline completely, but I'm running into >>> problems when using parallelism. >>> The pipeline can be summarized as: >>> read file -> shuffle -> process -> write files >>> >>> When using parallelism > 1, the pipeline stalls and the Task Manager >>> outputs following warnings: >>> flink-taskmanager_1 | 2020-04-30 09:24:46,272 INFO >>> org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel >>> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at >>> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap >>> (FlatMap at ExtractOutput[0]) (7/10). See: >>> https://issues.apache.org/jira/browse/BEAM-4280 for the history for >>> this issue. >>> >>> The referenced issue [1] doesn't contain a lot of information and is >>> resolved. There is a Flink issue [2] that seems related, although I'm not >>> seeing the reported stacktrace. I guess this problem occurs since I'm >>> reading and writing to the same disc in parallel. >>> >>> Increasing the Task Manager memory seems to resolve the issue partially. >>> I'm still getting the stalled channel warnings, but the pipeline does >>> proceed step-wise but slowly. >>> >>> Using BATCH_FORCED execution mode removes the warnings, but still runs a >>> lot slower than running with parallelism=1. >>> >>> The pipeline shouldn't be I/O bounded, so I guess I should still be able >>> to get some benefit out of running tasks in parallel? >>> >>> 1. https://issues.apache.org/jira/browse/BEAM-4280 >>> 2. >>> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692 >>> >>> Kind regards, >>> Robbe >>> >>> [image: https://ml6.eu] <https://ml6.eu> >>> >>> Robbe Sneyders >>> >>> ML6 Gent >>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> >>> >>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008> >>> >>> >>> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <[email protected]> wrote: >>> >>>> > This seems to have worked, as the output file is created on the host >>>> system. However the pipeline silently fails, and the output file remains >>>> empty. >>>> >>>> Have you checked the SDK container logs? They are most likely to >>>> contain relevant failure information. >>>> >>>> > I don't know if this is a result of me rebuilding the Job Server, or >>>> caused by another issue. >>>> >>>> Looks like there is an old but unresolved bug with the same error: >>>> https://issues.apache.org/jira/browse/BEAM-5397 >>>> >>>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <[email protected]> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> We're working on a project where we're limited to one big development >>>>> machine for now. We want to start developing data processing pipelines in >>>>> Python, which should eventually be ported to a currently unknown setup on >>>>> a >>>>> separate cluster or cloud, so we went with Beam for its portability. >>>>> >>>>> For the development setup, we wanted to have the least amount of >>>>> overhead possible, so we deployed a one node flink cluster with >>>>> docker-compose. The whole setup is defined by the following >>>>> docker-compose.yml: >>>>> >>>>> ``` >>>>> version: "2.1" >>>>> services: >>>>> flink-jobmanager: >>>>> image: flink:1.9 >>>>> network_mode: host >>>>> command: jobmanager >>>>> environment: >>>>> - JOB_MANAGER_RPC_ADDRESS=localhost >>>>> >>>>> flink-taskmanager: >>>>> image: flink:1.9 >>>>> network_mode: host >>>>> depends_on: >>>>> - flink-jobmanager >>>>> command: taskmanager >>>>> environment: >>>>> - JOB_MANAGER_RPC_ADDRESS=localhost >>>>> volumes: >>>>> - staging-dir:/tmp/beam-artifact-staging >>>>> - /usr/bin/docker:/usr/bin/docker >>>>> - /var/run/docker.sock:/var/run/docker.sock >>>>> user: flink:${DOCKER_GID} >>>>> >>>>> beam-jobserver: >>>>> image: apache/beam_flink1.9_job_server:2.20.0 >>>>> network_mode: host >>>>> command: --flink-master=localhost:8081 >>>>> volumes: >>>>> - staging-dir:/tmp/beam-artifact-staging >>>>> >>>>> volumes: >>>>> staging-dir: >>>>> ``` >>>>> >>>>> We can submit and run pipelines with the following options: >>>>> ``` >>>>> 'runner': 'PortableRunner', >>>>> 'job_endpoint': 'localhost:8099', >>>>> ``` >>>>> The environment type for the SDK Harness is configured to the default >>>>> 'docker'. >>>>> >>>>> However, we cannot write output files to the host system. To fix this, >>>>> I tried to mount a host directory to the Beam SDK Container (I had to >>>>> rebuild the Beam Job Server jar and image to do this). This seems to have >>>>> worked, as the output file is created on the host system. However the >>>>> pipeline silently fails, and the output file remains empty. Running the >>>>> pipeline with DirectRunner confirms that the pipeline is working. >>>>> >>>>> Looking at the output logs, the following error is thrown in the Flink >>>>> Task Manager: >>>>> flink-taskmanager_1 | java.lang.NoClassDefFoundError: >>>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 >>>>> I don't know if this is a result of me rebuilding the Job Server, or >>>>> caused by another issue. >>>>> >>>>> We currently do not have a distributed file system available. Is there >>>>> any way to make writing to the host system possible? >>>>> >>>>> Kind regards, >>>>> Robbe >>>>> >>>>> [image: https://ml6.eu] <https://ml6.eu> >>>>> >>>>> Robbe Sneyders >>>>> >>>>> ML6 Gent >>>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> >>>>> >>>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008> >>>>> >>>>
