Hi,

I see that JM and TM failures are different (from TM, it's actually a
warning). Could you please share the ERROR message from TM?

Have you tried increasing taskmanager.network.retries [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries

Regards,
Roman

On Fri, Apr 30, 2021 at 11:55 PM Sihan You <leo.yo...@gmail.com> wrote:
>
> Hi,
>
> We are experiencing some netty issue with our Flink cluster, which we 
> couldn't figure the cause.
>
> Below is the stack trace of exceptions from TM's and JM's perspectives.  we 
> have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the 
> TM are complaining about the connection issue. When this exception occurs, 
> the TM they are complaining about is still up and live. this will cause our 
> job to be stuck in the restart loop for a couple of hours then back to normal.
>
> We are using HDFS as the state backend and the checkpoint dir.
> the application is running in our own data center and in Kubernetes as a 
> standalone job.
>
>
> ## Job Graph
>
> the job graph is like this.
> source 1.1 (5 parallelism).  ->
>                                                   union ->
> source 1.2 (80 parallelism) ->
>                                                                     connect 
> -> sink
> source 2.1 (5 parallelism).  ->
>                                                   union ->
> source 2.2 (80 parallelism) ->
>
>
> ## JM's Stacktrace
>
> ```
> message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) 
> switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ 
> 100.98.115.117 
> (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>  Sending the partition request to 'null' failed. at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: 
> java.nio.channels.ClosedChannelException at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
> ```
>
> ## TM's stacktrace
> ```
> timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate 
> Operator (6/80)#6", 
> class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory",
>  method="connectWithRetries(line:121)", message="Failed 1 times to connect to 
> /100.98.115.117:41245. 
> Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>  Connecting to remote task manager '/100.98.115.117:41245' has failed. This 
> might indicate that the remote task manager has been lost. at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
>  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> [flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> [flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) 
> [?:?]Caused by: java.lang.NullPointerException at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) 
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more
>
> ```

Reply via email to