1、建议问题别同时发到三个邮件去
2、找找还有没有更加明显的异常日志

刘建刚 <[email protected]> 于2020年1月22日周三 上午10:25写道:

> I am using flink 1.6.2 on yarn. State backend is rocksdb.
>
> > 2020年1月22日 上午10:15,刘建刚 <[email protected]> 写道:
> >
> >       I have a flink job which fails occasionally. I am eager to avoid
> this problem. Can anyone help me? The error stacktrace is as following:
> > java.io.IOException: java.lang.StackOverflowError
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
> >       at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
> >       at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:236)
> >       at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
> >       at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.StackOverflowError
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.InputChannel.setError(InputChannel.java:203)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >       at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
> >       at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
> >       at org.apache.flink.runtime.io
> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
> >       at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
> >
>
>

回复