org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Fatal error at remote task manager '/xxxxxx:14941'.
        at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:276)
        at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:182)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
org.apache.flink.runtime.io.network.netty.ZeroCopyNettyMessageDecoder.channelRead(ZeroCopyNettyMessageDecoder.java:128)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.nio.channels.ClosedChannelException
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:297)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:119)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.addCredit(PartitionRequestQueue.java:181)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:140)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:43)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        ... 13 more
Caused by: 
org.apache.flink.runtime.io.network.partition.DataConsumptionException: 
java.nio.channels.ClosedChannelException
        at 
org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView.getNextBuffer(SpilledSubpartitionView.java:150)
        at 
org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:162)





 ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  - 
Encountered error while consuming partitions
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
.......

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.pushToOperator(OperatorChain.java:638)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.collect(OperatorChain.java:612)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.collect(OperatorChain.java:575)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:763)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:741)
        at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.collect(HashJoinOperator.java:202)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.innerJoin(HashJoinOperator.java:187)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator$InnerHashJoinOperator.join(HashJoinOperator.java:310)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.joinWithNextKey(HashJoinOperator.java:181)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.processElement2(HashJoinOperator.java:159)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$TwoInputStreamOperatorProxy.processElement2(OperatorChain.java:1389)
        at 
org.apache.flink.streaming.runtime.io.SecondOfTwoInputProcessor.processRecord(SecondOfTwoInputProcessor.java:91)
        at 
org.apache.flink.streaming.runtime.io.InputGateFetcher.fetchAndProcess(InputGateFetcher.java:159)
        at 
org.apache.flink.streaming.runtime.io.StreamArbitraryInputProcessor.process(StreamArbitraryInputProcessor.java:134)
        at 
org.apache.flink.streaming.runtime.tasks.ArbitraryInputStreamTask.run(ArbitraryInputStreamTask.java:183)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:324)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:727)
        at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithSecondInputOfTwoInputStreamOperatorOutput.pushToOperator(OperatorChain.java:850)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithSecondInputOfTwoInputStreamOperatorOutput.collect(OperatorChain.java:824)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithSecondInputOfTwoInputStreamOperatorOutput.collect(OperatorChain.java:787)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:763)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:741)
        at BatchExecCalcRule$64.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.pushToOperator(OperatorChain.java:635)
        ... 18 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.pushToOperator(OperatorChain.java:638)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.collect(OperatorChain.java:612)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.collect(OperatorChain.java:575)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:763)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:741)
        at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.collect(HashJoinOperator.java:202)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.innerJoin(HashJoinOperator.java:187)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator$InnerHashJoinOperator.join(HashJoinOperator.java:310)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.joinWithNextKey(HashJoinOperator.java:181)
        at 
org.apache.flink.table.runtime.join.batch.HashJoinOperator.processElement2(HashJoinOperator.java:159)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$TwoInputStreamOperatorProxy.processElement2(OperatorChain.java:1389)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithSecondInputOfTwoInputStreamOperatorOutput.pushToOperator(OperatorChain.java:847)
        ... 24 more

Caused by: java.lang.RuntimeException
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:96)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:76)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:42)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:763)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:741)
        at BatchExecCalcRule$82.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingWithOneInputStreamOperatorOutput.pushToOperator(OperatorChain.java:635)
        ... 36 more
Caused by: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:250)
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:207)
        at 
org.apache.flink.runtime.io.network.partition.InternalResultPartition.requestNewBufferBuilder(InternalResultPartition.java:448)
        at 
org.apache.flink.runtime.io.network.partition.InternalResultPartition.copyFromSerializerToTargetChannel(InternalResultPartition.java:526)
        at 
org.apache.flink.runtime.io.network.partition.InternalResultPartition.emitRecord(InternalResultPartition.java:231)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:93)
        ... 42 more

回复