Re: Blocked requesting MemorySegment when Segments are available.

2020-06-10 Thread Zhijiang
Hi David,

I want to clarify two things firstly based on the info you provided below.

1. If all the tasks are running on the same TaskManager, it would be no 
credit-based flow control. The downstream operator consumes the upstream's data 
in memory directly, no need network shuffle.
2. If the TaskManager has available buffers, that does not mean the internal 
task must have available buffers on input or output sides. E.g for the output 
side of "enrich-events" operator, it has
10 buffers in maximum. After these buffers are exhausted the operator would be 
blocked no matter with available buffers on TaskManager level.

Considering your case, could you double check whether there are buffers 
accumulated in output ("outputQueueLength" metric) of "enrich-events" operator 
and whether the "numRecordsIn/numBytesIn" metric of "Test Function" operator is 
more than 0? I want to get ride of the factors of buffer leak on upstream side 
and without partition request on downstream side. Then we can further allocate 
whether
the input availability notification on downstream side has bugs to make it 
stuck forever.

Best,
Zhijiang


--
From:David Maddison 
Send Time:2020年6月9日(星期二) 19:28
To:user 
Subject:Blocked requesting MemorySegment when Segments are available.

Hi,

I keep seeing the following situation where a task is blocked getting a 
MemorySegment from the pool but the TaskManager is reporting that it has lots 
of MemorySegments available.  

I'm completely stumped as to how to debug or what to look at next so any 
hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever 
attempting to get a memory segment to send to downstream operator "Test 
function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0 
tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0xd2206000> (a 
java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)


All the operator tasks are running on the same TaskManager and the TaskManager 
reports that it has 6,517 memory segments available, so it's confusing why the 
task would be blocked getting a memory segment.

Memory Segments
Type  Count
Available  6,517
Total  6,553

Even more confusing is that the downstream task appears to be waiting for data 
and therefore I would assume that the credit based flow control isn't causing 
the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c 
waiting on condition [0x7f644abf]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0xc91de1d0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
 at 

Blocked requesting MemorySegment when Segments are available.

2020-06-09 Thread David Maddison
Hi,

I keep seeing the following situation where a task is blocked getting a
MemorySegment from the pool but the TaskManager is reporting that it has
lots of MemorySegments available.

I'm completely stumped as to how to debug or what to look at next so any
hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever
attempting to get a memory segment to send to downstream operator "Test
function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0
tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xd2206000> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)


All the operator tasks are running on the same TaskManager and the
TaskManager reports that it has 6,517 memory segments available, so it's
confusing why the task would be blocked getting a memory segment.

Memory Segments
Type  Count
Available  6,517
Total  6,553

Even more confusing is that the downstream task appears to be waiting for
data and therefore I would assume that the credit based flow control isn't
causing the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c
waiting on condition [0x7f644abf]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc91de1d0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)