This type of stack trace occurs when the downstream operator is blocked for some reason. Flink maintains a finite number of network buffers for each network channel. If the receiving downstream operator does not process incoming network buffers, the upstream operator blocks. This is also called backpressure and a useful feature to avoid data congestion.

I would check the stack traces downstream to find the cause of the backpressure.

-Max

On 17.09.20 19:50, Deshpande, Omkar wrote:
Flink 1.10
------------------------------------------------------------------------
*From:* Kyle Weaver <[email protected]>
*Sent:* Thursday, September 17, 2020 9:34 AM
*To:* [email protected] <[email protected]>
*Subject:* Re: flink runner 1.10 checkpoint timeout issue
This email is from an external sender.

What is the version of your Flink cluster?

On Wed, Sep 16, 2020 at 9:10 PM Deshpande, Omkar <[email protected] <mailto:[email protected]>> wrote:

    Hello,

    I recently upgraded to beam-flink-runner-1.10:2.23.0 from
    beam-flink-runner-1.9:2.23.0. My application was working as expected
    with 1.9 runner. but after upgrading the checkpoints are timing out.
    Even after increasing the timeout significantly, the checkpoints
    keep failing. I was trying to look at the stack dump to determine
    any deadlocks. There are no deadlocks. But this thread seems to be
    in awaiting confirmation stage for long time -

    Legacy Source Thread - Source:
    read/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
    read/Remove Kafka Metadata/ParMultiDo(Anonymous) -> Random key
    assignment SPP/ParMultiDo(RandomPartitioner) -> Window for
    repartitioning SPP/Window.Assign.out -> ToKeyedWorkItem (1/4)
    <https://jstack.review/#tda_15_threaddetails_0x00007feed3601800>
    awaiting notification on [ 0x00000007b83b7958
    <https://jstack.review/#tda_15_sync_0x00000007b83b7958> ] , holding [

      * 0x00000007bc786fd8
        <https://jstack.review/#tda_15_sync_0x00000007bc786fd8>

    ]
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at
    
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
    at
    java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at
    
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
    at
    java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at
    
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)


    My application is IO bound, i.e every record makes a rest call and
    takes a few seconds to complete.
    Did not face this issue with 1.9 runner. What has changed in 1.10
    runner ? Any pointers for debugging?

    Omkar

Reply via email to