We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink
v1.6.4 which we are using now but the problem now seems to come up for
relatively simpler scenarios as well. Deadlock dump below -
Java stack information for the threads listed above:
===================================================
"CoGroup (2/2)":
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x000000062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x000000063c785350> (a java.lang.Object)
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
- locked <0x000000062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"DataSource (1/1)":
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
- waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
- locked <0x000000063fdf4888> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.
We are not setting any slot sharing parameters since this batch based
processing so it uses the default (and there don’t seem to be any options
available to manipulate slot sharing for non-streaming).
If we disable slot sharing (assuming it will be through some config across the
job) wouldn’t the job become relatively more slower?
Thanks,
Krishna.
From: Zhijiang(wangzhijiang999) <[email protected]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[email protected]>; Piotr Nowojski
<[email protected]>
Cc: Narayanaswamy, Krishna [Tech] <[email protected]>; Nico
Kruber <[email protected]>; [email protected]
Subject: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when
running a large job > 10k tasks
There actually exists this deadlock for special scenarios.
Before fixing the bug, we can avoid this issue by not deploying the map and
sink tasks in the same task manager to work around.
Krishna, do you share the slot for these two tasks? If so, you can set disable
slot sharing for this job.
Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate
blocking result partition to avoid this issue temporarily.
Best,
Zhijiang
------------------------------------------------------------------
发件人:Piotr Nowojski <[email protected]<mailto:[email protected]>>
发送时间:2018年10月4日(星期四) 21:54
收件人:Aljoscha Krettek <[email protected]<mailto:[email protected]>>
抄 送:"Narayanaswamy, Krishna"
<[email protected]<mailto:[email protected]>>; Nico
Kruber <[email protected]<mailto:[email protected]>>;
[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when
running a large job > 10k tasks
Hi,
Thanks for reporting the problem. This bug was previously unknown to us. I have
created a jira ticket for this bug:
https://issues.apache.org/jira/browse/FLINK-10491<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10491&d=DwMGaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=DygsdSAGndINcmGHXdOuNqXoOCB2NN0_9kyFxltrvho&m=rusN1v26D7lhUkhDDHlwxb3tkmORiXc3lqbFcuIFWE0&s=fI-H2n7OiT2J64st30dncobGqzFbwc9v_I2fPfMP4-k&e=>
Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t
know if there is some hot fix or anything that can at least mitigate/decrease
the probability of the bug for you until we fix it properly.
Piotrek
On 4 Oct 2018, at 13:55, Aljoscha Krettek
<[email protected]<mailto:[email protected]>> wrote:
Hi,
this looks like a potential Flink bug. Looping in Nico and Piotr who have
looked into that in the past. Could you please comment on that?
Best,
Aljoscha
On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I am trying to run one large single job graph which has > 10k tasks. The form
of the graph is something like
DataSource -> Filter -> Map [...multiple]
· Sink1
· Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory
allocation of 32G per TM. The JM is running with 8G.
Everything starts up and runs fine with close to 6-7k tasks (this is variable
and is mostly the source /filter/map portions) completing and then the graph
just hangs. I managed to connect to the task managers and get a thread dump
just in time and found the following deadlock on one of the TMs which
apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or
try out to fix this.
Marked below are the 2 isolated thread stacks marking the deadlock -
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA
waiting for monitor entry
waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb>
(a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
- locked <0x2dfd> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
- locked <0x2da5> (a java.lang.Object)
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
at
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
at
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor
entry
java.lang.Thread.State: BLOCKED
blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to
release lock on <0x2dfd> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
at
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
- locked <0x2dfb> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
- locked <0x2dfc> (a
org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
Thanks,
Krishna.
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>