Hi community,

Looking at the code [1] it seems that it is related to not have
availableMemorySegments anymore. I am looking at several metrics but it
hasn't seemed to help me understand where I can measure the root cause of
this error message.

- flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not
seem to give me a related cause.
- flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength
I see my reducer operator always with queue lenght equal 4. Pre-aggregate
task sometimes goes to 3 but it goes only few times.
- flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and
flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength
shows my source task several times in 100%. But my error message comes from
the pre-aggregate task.
-
flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond
DOES show the the pre-aggregate task is consuming a lot. But with which
metric can I relate this to know in advance how much is a lot?

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265

Thanks for your suggestions and here is my stack trace:

java.lang.RuntimeException: Buffer pool is destroyed.
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84)
at
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49)
at
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to