Hi Sihan,

we managed to reproduce it, see [1]. It will be fixed in the next 1.12 and
the upcoming 1.13 release.

[1] https://issues.apache.org/jira/browse/FLINK-21992

On Tue, Apr 6, 2021 at 8:45 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Sihan,
>
> Unfortunately, we are unable to reproduce the issue so far. Could you
> please describe in more detail the job graph, in particular what are
> the downstream operators and whether there is any chaining?
>
> Do I understand correctly, that Flink returned back to normal at
> around 8:00; worked fine for ~3 hours; got stuck again; and then it
> was restarted?
>
> I'm also wondering whether requestBufferBuilderBlocking is just a
> frequent operation popping up in thread dump. Or do you actually see
> that Legacy source threads are *stuck* there?
>
> Could you please explain how the other metrics are calculated?
> (PURCHASE KAFKA NUM-SEC, PURCHASE OUTPOOL, PLI PURCHASE JOIN INPOOL).
> Or do you have rate metrics per source?
>
> Regards,
> Roman
>
>
>
> On Wed, Mar 31, 2021 at 1:44 AM Sihan You <leo.yo...@gmail.com> wrote:
> >
> > Awesome. Let me know if you need any other information. Our application
> has a heavy usage on event timer and keyed state. The load is vey heavy. If
> that matters.
> > On Mar 29, 2021, 05:50 -0700, Piotr Nowojski <pnowoj...@apache.org>,
> wrote:
> >
> > Hi Sihan,
> >
> > Thanks for the information. Previously I was not able to reproduce this
> issue, but after adding a union I think I can see it happening.
> >
> > Best,
> > Piotrek
> >
> > pt., 26 mar 2021 o 22:59 Sihan You <leo.yo...@gmail.com> napisał(a):
> >>
> >> this issue not always reproducible. it happened 2~3 times in our
> development period of 3 months.
> >>
> >> On Fri, Mar 26, 2021 at 2:57 PM Sihan You <leo.yo...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> Thanks for responding. I'm working in a commercial organization so I
> cannot share the detailed stack with you. I will try to describe the issue
> as specific as I can.
> >>> <image.png>
> >>> above is a more detailed stats of our job.
> >>> 1. How long did the job run until it got stuck?
> >>> about 9 hours.
> >>> 2. How often do you checkpoint or how many checkpoints succeeded?
> >>> I don't remember the exact number of the successful checkpoints, but
> there should be around 2. then the checkpoint started to fail because of
> the timeout.
> >>> 3. What were the typical checkpoint sizes? How much in-flight data was
> checkpointed? (A screenshot of the checkpoint tab in the Flink UI would
> suffice)
> >>> the first checkpoint is 5T and the second is 578G.
> >>> 4. Was the parallelism of the whole job 5? How is the topology roughly
> looking? (e.g., Source -> Map -> Sink?)
> >>> the source is a union of two source streams. one has a parallelism of
> 5 and the other has 80.
> >>> the job graph is like this.
> >>> source 1.1 (5 parallelism).  ->
> >>>                                                   union ->
> >>> source 1.2 (80 parallelism) ->
> >>>
>  connect -> sink
> >>> source 2.1 (5 parallelism).  ->
> >>>                                                   union ->
> >>> source 2.2 (80 parallelism) ->
> >>> 5. Did you see any warns/errors in the logs related to checkpointing
> and I/O?
> >>> no error is thrown.
> >>> 6. What was your checkpoint storage (e.g. S3)? Is the application
> running in the same data-center (e.g. AWS)?
> >>> we are using HDFS as the state backend and the checkpoint dir.
> >>> the application is running in our own data center and in Kubernetes as
> a standalone job.
> >>>
> >>> On Fri, Mar 26, 2021 at 7:31 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
> >>>>
> >>>> Hi Sihan,
> >>>>
> >>>> More importantly, could you create some example job that can
> reproduce that problem? It can have some fake sources and no business
> logic, but if you could provide us with something like that, it would allow
> us to analyse the problem without going back and forth with tens of
> questions.
> >>>>
> >>>> Best, Piotrek
> >>>>
> >>>> pt., 26 mar 2021 o 11:40 Arvid Heise <ar...@apache.org> napisał(a):
> >>>>>
> >>>>> Hi Sihan,
> >>>>>
> >>>>> thanks for reporting. This looks like a bug to me. I have opened an
> investigation ticket with the highest priority [1].
> >>>>>
> >>>>> Could you please provide some more context, so we have a chance to
> reproduce?
> >>>>> 1. How long did the job run until it got stuck?
> >>>>> 2. How often do you checkpoint or how many checkpoints succeeded?
> >>>>> 3. What were the typical checkpoint sizes? How much in-flight data
> was checkpointed? (A screenshot of the checkpoint tab in the Flink UI would
> suffice)
> >>>>> 4. Was the parallelism of the whole job 5? How is the topology
> roughly looking? (e.g., Source -> Map -> Sink?)
> >>>>> 5. Did you see any warns/errors in the logs related to checkpointing
> and I/O?
> >>>>> 6. What was your checkpoint storage (e.g. S3)? Is the application
> running in the same data-center (e.g. AWS)?
> >>>>>
> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21992
> >>>>>
> >>>>> On Thu, Mar 25, 2021 at 3:00 AM Sihan You <leo.yo...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I keep seeing the following situation where a task is blocked
> getting a MemorySegment from the pool but the operator is still reporting.
> >>>>>>
> >>>>>> I'm completely stumped as to how to debug or what to look at next
> so any hints/help/advice would be greatly appreciated!
> >>>>>>
> >>>>>> The situation is as follows (Flink 1.12.2):
> >>>>>>  <Attachment.tiff>
> >>>>>> As you can see from 02:00 to 08:00, no records is produced from
> this purchase source while there still a bunch of records need to be
> processed from Kafka. And during this period of time. The outPoolUsage is
> around 0.6 and the downstream operators seems also have the available
> buffer. We redeployed the job and disabled unaligned checkpoint at around 9
> so it becomes normal now.
> >>>>>>
> >>>>>> The thread dump we took shows that we are stuck here:
> >>>>>>
> >>>>>> "Legacy Source Thread - Source: Kafka Reader - ACCOUNT -
> kafka-bootstrap-url.com:9443 (1/5)#2" #9250 prio=5 os_prio=0 cpu=5
> >>>>>> 9490.62ms elapsed=8399.28s tid=0x00007f0e99c23910 nid=0x2df5
> waiting on condition [0x00007f0fa85fe000]
> >>>>>>  java.lang.Thread.State: WAITING (parking)
> >>>>>>  at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
> >>>>>>  - parking to wait for <0x00000000ab5527c8> (a
> java.util.concurrent.CompletableFuture$Signaller)
> >>>>>>  at java.util.concurrent.locks.LockSupport.park(java.base@11.0.8
> /LockSupport.java:194)
> >>>>>>  at
> java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.8
> /CompletableFuture.java:1796)
> >>>>>>  at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.8
> /ForkJoinPool.java:3128)
> >>>>>>  at
> java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.8
> /CompletableFuture.java:1823)
> >>>>>>  at java.util.concurrent.CompletableFuture.get(java.base@11.0.8
> /CompletableFuture.java:1998)
> >>>>>>  at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >>>>>>  at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >>>>>>  at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:337)
> >>>>>>  at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
> >>>>>>  at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
> >>>>>>  at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
> >>>>>>  at org.apache.flink.runtime.io
> .network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> >>>>>>  at org.apache.flink.runtime.io
> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
> >>>>>>  at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
> >>>>>>  at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
> >>>>>>  at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
> >>>>>>  at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> >>>>>>  at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> >>>>>>  at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> >>>>>>  at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> >>>>>>  - locked <0x00000000aef80e00> (a java.lang.Object)
>

Reply via email to