[ 
https://issues.apache.org/jira/browse/FLINK-32362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733349#comment-17733349
 ] 

Cai Liuyang edited comment on FLINK-32362 at 6/16/23 6:38 AM:
--------------------------------------------------------------

[~fanrui] Two ways:
 # the simple way code like:
{code:java}
// code placeholder
try {
    for (Integer subtaskId : subTaskIds) {
        context.sendEventToSourceOperator(
              subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
    }
 } catch (Throwable ignore) {
    LOG.warn("Announce the newest combined watermark to source failed, task 
maybe during failover, wait next time to announce."
 }{code}

 # add a trySendEventToSourceOperator(), this method only send when task is 
ready and not throw exception if task is not ready.

i prefer the first one, because it's simple and it can also cover some other 
exception like rpc timeout 


was (Author: cailiuyang):
Two ways:
 # the simple way code like:
{code:java}
// code placeholder
try {
    for (Integer subtaskId : subTaskIds) {
        context.sendEventToSourceOperator(
              subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
    }
 } catch (Throwable ignore) {
    LOG.warn("Announce the newest combined watermark to source failed, task 
maybe during failover, wait next time to announce."
 }{code}

 # add a trySendEventToSourceOperator(), this method only send when task is 
ready and not throw exception if task is not ready.

i prefer the first one, because it's simple and it can also cover some other 
exception like rpc timeout 

> SourceAlignment announceCombinedWatermark period task maybe lost
> ----------------------------------------------------------------
>
>                 Key: FLINK-32362
>                 URL: https://issues.apache.org/jira/browse/FLINK-32362
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.0
>            Reporter: Cai Liuyang
>            Assignee: Cai Liuyang
>            Priority: Major
>
> When we use sourcealignment,we also found there is another problem that 
> announceCombinedWatermark may throw a exception (like  "subtask 25 is not 
> ready yet to receive events" , this subtask maybe under failover), which will 
> lead the period task not running any more (ThreadPoolExecutor will not 
> schedule the period task if it throw a exception)
> I think we should increase the robustness of announceCombinedWatermark 
> function to avoid it throw any exception (if send fail, just wait next send) 
> (code see 
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
>  )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to