[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744645#comment-17744645 ] Piotr Nowojski commented on FLINK-19249: Hi [~Jiangang], no sorry there was no progress on that issue. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742338#comment-17742338 ] Liu commented on FLINK-19249: - [~ym] [~zjwang] [~pnowojski] [~xtsong] Any progress on this issue? In our case, the job stops to process any data and we find that the method exceptionCaught in PartitionRequestQueue is called. As the suggestions in the discuss, we may need add hook to fail the task or the taskmanager. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336090#comment-17336090 ] Flink Jira Bot commented on FLINK-19249: This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Priority: Major > Labels: stale-major > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17327653#comment-17327653 ] Flink Jira Bot commented on FLINK-19249: This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Priority: Major > Labels: stale-major > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245663#comment-17245663 ] Yuan Mei commented on FLINK-19249: -- A bit more implementation suggestions: I would suggest adding the hook (no matter report to JM or fail task) in `PartitionRequestQueue#handleException` instead of simply `releaseAllResources()`, we can add hooks to report to JM or flag the result partitions here. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Assignee: Yuan Mei >Priority: Major > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327 ] Yuan Mei commented on FLINK-19249: -- The problem has been thoroughly explained in both FLINK-16030 and this ticket. Summarized in short, when the network environment is unstable, downstream TMs sometimes may not be able to respond to such errors until the TCP-keepalive probe is triggered. The reason is * Upstream netty handles error/exception by 1) sending `ErrorResponse` to downstream and 2). simply releases sub-partition view resources but nothing else. It replies on downstream TM to handle the Error. But as we can see, `ErrorResponse` may not reach downstream due to the unstable environment; and TCP-keepalive can not be too short without unwanted side effects (default 2 hours). As a result, we probably need to do SOMETHING when upstream detect such errors, then the question is how to detect and what to do after detection. Put some ideas here for discussion and also in mind that this happens rarely. *1. Where to detect the error?* When exception caught; for example when failing to send the data. It is a better place than `ChannelInactive`, since ChannelInactive may be caused by different reasons. As long as upstream fails to send data, the job loses data since we do not retry when sending data. *2. Should we allow reconnection from the downstream or tolerant intermittent network?* This won't work unless we have retry logic from upstream; But `retry` also means waiting for responding, and this will definitely affect performance. *3. Is failing the job, which contains tasks unable to send data to the downstream enough?* I think `yes` for now (in the streaming case: task failure -> the entire job failure), but may not be extensible for cases of batch/single task failover. The answer `yes` is also based on how the current physical tcp-connections are shared: different jobs do not share tcp-connections (please correct me if I am wrong). I personally do not think “Job/Task Failover” is the right direction to go; Conceptually this is a TM-level error; there is not and probably should not be a direct hook up between netty -> task (it is still doable through ResultPartition though). *The more reasonable way* to go is to report the exception to JM (enrich the exception); JM decides what to react/fail (in this case, JM need to reboot both the upstream and downstream TMs); and as a result failover all jobs containing in both upstream and downstream TMs. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu >Assignee: Yuan Mei >Priority: Major > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can >
[jira] [Commented] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.
[ https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217343#comment-17217343 ] Zhijiang commented on FLINK-19249: -- I dig out the previously discussed issues [FLINK-16030|https://issues.apache.org/jira/browse/FLINK-16030]which might have the same direction with it. > Detect broken connections in case TCP Timeout takes too long. > - > > Key: FLINK-19249 > URL: https://issues.apache.org/jira/browse/FLINK-19249 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Congxian Qiu(klion26) >Priority: Major > > {quote}encountered this error on 1.7, after going through the master code, I > think the problem is still there > {quote} > When the network environment is not so good, the connection between the > server and the client may be disconnected innocently. After the > disconnection, the server will receive the IOException such as below > {code:java} > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) > at java.lang.Thread.run(Thread.java:748) > {code} > then release the view reader. > But the job would not fail until the downstream detect the disconnection > because of {{channelInactive}} later(~10 min). between such time, the job can > still process data, but the broken channel can't transfer any data or event, > so snapshot would fail during this time. this will cause the job to replay > many data after failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)