Awesome. Let me know if you need any other information. Our application has a 
heavy usage on event timer and keyed state. The load is vey heavy. If that 
matters.
On Mar 29, 2021, 05:50 -0700, Piotr Nowojski <pnowoj...@apache.org>, wrote:
> Hi Sihan,
>
> Thanks for the information. Previously I was not able to reproduce this 
> issue, but after adding a union I think I can see it happening.
>
> Best,
> Piotrek
>
> > pt., 26 mar 2021 o 22:59 Sihan You <leo.yo...@gmail.com> napisał(a):
> > > this issue not always reproducible. it happened 2~3 times in our 
> > > development period of 3 months.
> > >
> > > > On Fri, Mar 26, 2021 at 2:57 PM Sihan You <leo.yo...@gmail.com> wrote:
> > > > > Hi,
> > > > >
> > > > > Thanks for responding. I'm working in a commercial organization so I 
> > > > > cannot share the detailed stack with you. I will try to describe the 
> > > > > issue as specific as I can.
> > > > > <image.png>
> > > > > above is a more detailed stats of our job.
> > > > > 1. How long did the job run until it got stuck?
> > > > > about 9 hours.
> > > > > 2. How often do you checkpoint or how many checkpoints succeeded?
> > > > > I don't remember the exact number of the successful checkpoints, but 
> > > > > there should be around 2. then the checkpoint started to fail because 
> > > > > of the timeout.
> > > > > 3. What were the typical checkpoint sizes? How much in-flight data 
> > > > > was checkpointed? (A screenshot of the checkpoint tab in the Flink UI 
> > > > > would suffice)
> > > > > the first checkpoint is 5T and the second is 578G.
> > > > > 4. Was the parallelism of the whole job 5? How is the topology 
> > > > > roughly looking? (e.g., Source -> Map -> Sink?)
> > > > > the source is a union of two source streams. one has a parallelism of 
> > > > > 5 and the other has 80.
> > > > > the job graph is like this.
> > > > > source 1.1 (5 parallelism).  ->
> > > > >                                                   union ->
> > > > > source 1.2 (80 parallelism) ->
> > > > >                                                                     
> > > > > connect -> sink
> > > > > source 2.1 (5 parallelism).  ->
> > > > >                                                   union ->
> > > > > source 2.2 (80 parallelism) ->
> > > > > 5. Did you see any warns/errors in the logs related to checkpointing 
> > > > > and I/O?
> > > > > no error is thrown.
> > > > > 6. What was your checkpoint storage (e.g. S3)? Is the application 
> > > > > running in the same data-center (e.g. AWS)?
> > > > > we are using HDFS as the state backend and the checkpoint dir.
> > > > > the application is running in our own data center and in Kubernetes 
> > > > > as a standalone job.
> > > > >
> > > > > > On Fri, Mar 26, 2021 at 7:31 AM Piotr Nowojski 
> > > > > > <pnowoj...@apache.org> wrote:
> > > > > > > Hi Sihan,
> > > > > > >
> > > > > > > More importantly, could you create some example job that can 
> > > > > > > reproduce that problem? It can have some fake sources and no 
> > > > > > > business logic, but if you could provide us with something like 
> > > > > > > that, it would allow us to analyse the problem without going back 
> > > > > > > and forth with tens of questions.
> > > > > > >
> > > > > > > Best, Piotrek
> > > > > > >
> > > > > > > > pt., 26 mar 2021 o 11:40 Arvid Heise <ar...@apache.org> 
> > > > > > > > napisał(a):
> > > > > > > > > Hi Sihan,
> > > > > > > > >
> > > > > > > > > thanks for reporting. This looks like a bug to me. I have 
> > > > > > > > > opened an investigation ticket with the highest priority [1].
> > > > > > > > >
> > > > > > > > > Could you please provide some more context, so we have a 
> > > > > > > > > chance to reproduce?
> > > > > > > > > 1. How long did the job run until it got stuck?
> > > > > > > > > 2. How often do you checkpoint or how many checkpoints 
> > > > > > > > > succeeded?
> > > > > > > > > 3. What were the typical checkpoint sizes? How much in-flight 
> > > > > > > > > data was checkpointed? (A screenshot of the checkpoint tab in 
> > > > > > > > > the Flink UI would suffice)
> > > > > > > > > 4. Was the parallelism of the whole job 5? How is the 
> > > > > > > > > topology roughly looking? (e.g., Source -> Map -> Sink?)
> > > > > > > > > 5. Did you see any warns/errors in the logs related to 
> > > > > > > > > checkpointing and I/O?
> > > > > > > > > 6. What was your checkpoint storage (e.g. S3)? Is the 
> > > > > > > > > application running in the same data-center (e.g. AWS)?
> > > > > > > > >
> > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21992
> > > > > > > > >
> > > > > > > > > > On Thu, Mar 25, 2021 at 3:00 AM Sihan You 
> > > > > > > > > > <leo.yo...@gmail.com> wrote:
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I keep seeing the following situation where a task is 
> > > > > > > > > > > blocked getting a MemorySegment from the pool but the 
> > > > > > > > > > > operator is still reporting.
> > > > > > > > > > >
> > > > > > > > > > > I'm completely stumped as to how to debug or what to look 
> > > > > > > > > > > at next so any hints/help/advice would be greatly 
> > > > > > > > > > > appreciated!
> > > > > > > > > > >
> > > > > > > > > > > The situation is as follows (Flink 1.12.2):
> > > > > > > > > > >  <Attachment.tiff>
> > > > > > > > > > > As you can see from 02:00 to 08:00, no records is 
> > > > > > > > > > > produced from this purchase source while there still a 
> > > > > > > > > > > bunch of records need to be processed from Kafka. And 
> > > > > > > > > > > during this period of time. The outPoolUsage is around 
> > > > > > > > > > > 0.6 and the downstream operators seems also have the 
> > > > > > > > > > > available buffer. We redeployed the job and disabled 
> > > > > > > > > > > unaligned checkpoint at around 9 so it becomes normal now.
> > > > > > > > > > >
> > > > > > > > > > > The thread dump we took shows that we are stuck here:
> > > > > > > > > > >
> > > > > > > > > > > "Legacy Source Thread - Source: Kafka Reader - ACCOUNT - 
> > > > > > > > > > > kafka-bootstrap-url.com:9443 (1/5)#2" #9250 prio=5 
> > > > > > > > > > > os_prio=0 cpu=5
> > > > > > > > > > > 9490.62ms elapsed=8399.28s tid=0x00007f0e99c23910 
> > > > > > > > > > > nid=0x2df5 waiting on condition [0x00007f0fa85fe000]
> > > > > > > > > > >  java.lang.Thread.State: WAITING (parking)
> > > > > > > > > > >  at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native 
> > > > > > > > > > > Method)
> > > > > > > > > > >  - parking to wait for <0x00000000ab5527c8> (a 
> > > > > > > > > > > java.util.concurrent.CompletableFuture$Signaller)
> > > > > > > > > > >  at 
> > > > > > > > > > > java.util.concurrent.locks.LockSupport.park(java.base@11.0.8/LockSupport.java:194)
> > > > > > > > > > >  at 
> > > > > > > > > > > java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.8/CompletableFuture.java:1796)
> > > > > > > > > > >  at 
> > > > > > > > > > > java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.8/ForkJoinPool.java:3128)
> > > > > > > > > > >  at 
> > > > > > > > > > > java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.8/CompletableFuture.java:1823)
> > > > > > > > > > >  at 
> > > > > > > > > > > java.util.concurrent.CompletableFuture.get(java.base@11.0.8/CompletableFuture.java:1998)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:337)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> > > > > > > > > > >  at 
> > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> > > > > > > > > > >  - locked <0x00000000aef80e00> (a java.lang.Object)

Reply via email to