Hi, I see that JM and TM failures are different (from TM, it's actually a warning). Could you please share the ERROR message from TM?
Have you tried increasing taskmanager.network.retries [1]? [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries Regards, Roman On Fri, Apr 30, 2021 at 11:55 PM Sihan You <leo.yo...@gmail.com> wrote: > > Hi, > > We are experiencing some netty issue with our Flink cluster, which we > couldn't figure the cause. > > Below is the stack trace of exceptions from TM's and JM's perspectives. we > have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the > TM are complaining about the connection issue. When this exception occurs, > the TM they are complaining about is still up and live. this will cause our > job to be stuck in the restart loop for a couple of hours then back to normal. > > We are using HDFS as the state backend and the checkpoint dir. > the application is running in our own data center and in Kubernetes as a > standalone job. > > > ## Job Graph > > the job graph is like this. > source 1.1 (5 parallelism). -> > union -> > source 1.2 (80 parallelism) -> > connect > -> sink > source 2.1 (5 parallelism). -> > union -> > source 2.2 (80 parallelism) -> > > > ## JM's Stacktrace > > ``` > message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) > switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ > 100.98.115.117 > (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > Sending the partition request to 'null' failed. at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: > java.nio.channels.ClosedChannelException at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more > ``` > > ## TM's stacktrace > ``` > timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate > Operator (6/80)#6", > class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory", > method="connectWithRetries(line:121)", message="Failed 1 times to connect to > /100.98.115.117:41245. > Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connecting to remote task manager '/100.98.115.117:41245' has failed. This > might indicate that the remote task manager has been lost. at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > [flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > [flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) > [?:?]Caused by: java.lang.NullPointerException at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more > > ```