[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-12-01 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 9:59 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs (Similar to TM#RpcPartitionStateChecker)

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-12-01 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 9:57 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs. We can share the same JM RPC of a 
TM#RpcPartitionStateChecker

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-12-01 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 9:56 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs. We can share the same JM RPC of a TM.

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-12-01 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 9:56 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs. We can share the same JM RPC of a task.

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-12-01 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 8:19 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't make sense unless we have retry logic from upstream; But `retry` 
also means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs.

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't work unless we have retry logic from upstream; But `retry` also 
means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-11-30 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 7:57 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't work unless we have retry logic from upstream; But `retry` also 
means waiting for response, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs.

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't work unless we have retry logic from upstream; But `retry` also 
means waiting for responding, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-11-30 Thread Yuan Mei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241327#comment-17241327
 ] 

Yuan Mei edited comment on FLINK-19249 at 12/1/20, 7:56 AM:


The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send data. It is a better 
place than `ChannelInactive`, since ChannelInactive may be caused by different 
reasons.  As long as upstream fails to send data, the job loses data since we 
do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't work unless we have retry logic from upstream; But `retry` also 
means waiting for responding, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot both 
the upstream and downstream TMs); and as a result failover all jobs containing 
in both upstream and downstream TMs.

 


was (Author: ym):
The problem has been thoroughly explained in both FLINK-16030 and this ticket. 
Summarized in short, when the network environment is unstable, downstream TMs 
sometimes may not be able to respond to such errors until the TCP-keepalive 
probe is triggered. The reason is
 * Upstream netty handles error/exception by 1) sending `ErrorResponse`  to 
downstream and 2). simply releases sub-partition view resources but nothing 
else. It replies on downstream TM to handle the Error.

But as we can see, `ErrorResponse` may not reach downstream due to the unstable 
environment; and TCP-keepalive can not be too short without unwanted side 
effects (default 2 hours).

As a result, we probably need to do SOMETHING when upstream detect such errors, 
then the question is how to detect and what to do after detection. Put some 
ideas here for discussion and also in mind that this happens rarely.



*1. Where to detect the error?*

When exception caught; for example when failing to send the data. It is a 
better place than `ChannelInactive`, since ChannelInactive may be caused by 
different reasons.  As long as upstream fails to send data, the job loses data 
since we do not retry when sending data.

*2. Should we allow reconnection from the downstream or tolerant intermittent 
network?*

This won't work unless we have retry logic from upstream; But `retry` also 
means waiting for responding, and this will definitely affect performance.

*3. Is failing the job, which contains tasks unable to send data to the 
downstream enough?*

I think `yes` for now (in the streaming case: task failure -> the entire job 
failure), but may not be extensible for cases of batch/single task failover.

The answer `yes` is also based on how the current physical tcp-connections are 
shared: different jobs do not share tcp-connections (please correct me if I am 
wrong). 

 

I personally do not think “Job/Task Failover” is the right direction to go; 
Conceptually this is a TM-level error; there is not and probably should not be 
a direct hook up between netty -> task (it is still doable through 
ResultPartition though).

*The more reasonable way* to go is to report the exception to JM (enrich the 
exception); JM decides what to react/fail (in this case, JM need to reboot 

[jira] [Comment Edited] (FLINK-19249) Detect broken connections in case TCP Timeout takes too long.

2020-10-20 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217343#comment-17217343
 ] 

Zhijiang edited comment on FLINK-19249 at 10/20/20, 6:50 AM:
-

I dig out the previously discussed issues 
[FLINK-16030|https://issues.apache.org/jira/browse/FLINK-16030] which might 
have the same direction with it.


was (Author: zjwang):
I dig out the previously discussed issues 
[FLINK-16030|https://issues.apache.org/jira/browse/FLINK-16030]which might have 
the same direction with it.

> Detect broken connections in case TCP Timeout takes too long.
> -
>
> Key: FLINK-19249
> URL: https://issues.apache.org/jira/browse/FLINK-19249
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> {quote}encountered this error on 1.7, after going through the master code, I 
> think the problem is still there
> {quote}
> When the network environment is not so good, the connection between the 
> server and the client may be disconnected innocently. After the 
> disconnection, the server will receive the IOException such as below
> {code:java}
> java.io.IOException: Connection timed out
>  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>  at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>  at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>  at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
> then release the view reader.
> But the job would not fail until the downstream detect the disconnection 
> because of {{channelInactive}} later(~10 min). between such time, the job can 
> still process data, but the broken channel can't transfer any data or event, 
> so snapshot would fail during this time. this will cause the job to replay 
> many data after failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)