Hi Lincoln,

Thanks very much for the reply! The issue seems to occur both in local 
development in the IDE, and when running in a Flink cluster. Below is the full 
Java code to replicate the issue. I generated an empty project following the 
instructions at 
https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/datastream/,
 namely I ran this:


$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.16.0 \
    -DgroupId=timeoutretry \
    -DartifactId=timeoutretry \
    -Dversion=0.1 \
    -Dpackage=timeoutretry \
    -DinteractiveMode=false

I then deleted the two generated files and created one called 
TimeoutRetry.java, with the code below. Here I've created a dummy source that 
simply emits one value, for simplicity. (Note that I first came across the 
issue when working with a Kinesis source, but in order to rule Kinesis out of 
the equation I created the dummy source instead.) Then I added an async 
function which I've hard-coded to wait 500ms then fail.


package timeoutretry;

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.streaming.util.retryable.RetryPredicates;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class TimeoutRetry {
    private static void log(String message) {
        
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_TIME) + " 
:: " + message);
    }

    public static class SingleValueSource extends RichSourceFunction<String> {
        private volatile boolean cancelled = false;
        private volatile boolean alreadySentValue = false;

        @Override
        public void run(SourceContext<String> ctx) {
            while (!cancelled) {
                synchronized (ctx.getCheckpointLock()) {
                    if (!alreadySentValue) {
                        ctx.collect("foo");
                        alreadySentValue = true;
                    }
                }
            }
        }

        @Override
        public void cancel() {
            cancelled = true;
        }
    }

    public static class ExampleRichAsyncFunction extends 
RichAsyncFunction<String, String> {

        @Override
        public void asyncInvoke(String input, ResultFuture<String> 
resultFuture) {
            log("Received " + input);

            // resultFuture.completeExceptionally(new Exception("Dummy error"));
            // The line above gets expected output:
            //   09:39:50.668851 :: Received foo
            //   09:39:52.671624 :: Received foo
            //   09:39:54.671417 :: Timed out handling fooo

            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                resultFuture.completeExceptionally(new Exception("Dummy 
error"));
            });
            // The block above gets unexpected output:
            //    09:57:01.574928 :: Received foo
            //    09:57:04.084659 :: Received foo
            //    09:57:05.581016 :: Timed out handling foo
            //    09:57:06.590309 :: Received foo
            //    09:57:09.099132 :: Received foo
            //    09:57:11.605754 :: Received foo
            //    09:57:14.114028 :: Received foo
            // This will keep going for as long as the number of maxAttempts 
set for the
            // AsyncRetryStrategies.FixedDelayRetryStrategyBuilder.
        }

        @Override
        public void timeout(String input, ResultFuture<String> resultFuture) {
            log("Timed out handling " + input);
            resultFuture.complete(Collections.emptyList());
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source = env
                .addSource(new SingleValueSource())
                .name("single-value");

        AsyncDataStream
                .unorderedWaitWithRetry(
                        source,
                        new ExampleRichAsyncFunction(),
                        4,
                        TimeUnit.SECONDS,
                        100,
                        new 
AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(10, 2000)
                                
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
                                .build()
                )
                .print();

        env.execute("Timeout Retry Example");
    }
}

I hope that gives you the information you need? Let me know if there's anything 
else I can to help troubleshoot. I'm still pretty new to Flink, so I might well 
just have misunderstood something or set something up incorrectly.

Thanks in advance,

Yoni.
________________________________
From: Lincoln Lee <lincoln.8...@gmail.com>
Sent: 14 December 2022 13:51
To: Yoni Gibbs <yonigi...@hotmail.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: AsyncDataStream: Retries keep executing after timeout

hi,
   Is this case running like a it case locally, or a streaming job running on a 
cluster? If it's the former, one thing I can think of is local testing using 
bounded datasource(has few test records) that will end input very fastly and 
then trigger the endOfInput logic of AsyncWaitOperator, that is it finishes all 
in fight delayed retry items immediately(asyncInvoke will be called as the last 
attempt before the operator exits and as the final result, regardless of 
whether it has timed out or not), this may be one more attempt than when the 
job does not end in normal running.
   For a long running job, the retry will start from stratch when job recover 
from restart(regardless of how many times it has been retried before), this may 
also result more attempts and longer time for retry elements.
   If you can provide more information about the test, maybe we can further 
clarify what the problem is.

Best,
Lincoln Lee


Yoni Gibbs <yonigi...@hotmail.com<mailto:yonigi...@hotmail.com>> 于2022年12月13日周二 
23:46写道:
Hi,

I've got a Kinesis consumer which reacts to each record by doing some async 
work using an implementation of RichAsyncFunction. I'm adding a retry strategy. 
After x failed attempts I want this to time out and give up by returning no 
data (i.e. not be treated as a failure).

Here is a cut down version of my code, which works as expected (in Kotlin, I 
hope that's OK - can supply Java translation if required):

val targetStream = AsyncDataStream
    .unorderedWaitWithRetry(
        inputStream,
        object : RichAsyncFunction<String, String>() {
            override fun asyncInvoke(input: String, resultFuture: 
ResultFuture<String>) {
                println("Received input: $input")
                resultFuture.completeExceptionally(Exception("Error from inside 
CompletableFuture"))
            }

            override fun timeout(input: String, resultFuture: 
ResultFuture<String>) {
                println("Timeout")
                resultFuture.complete(listOf())
            }
        },
        4,
        TimeUnit.SECONDS,
        100,
        AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000)
            .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
            .build()
    )

This will time out after 4 seconds, and the retry strategy is set to retry 
every two seconds. If I run that I get the output I expect, namely:

Received input: foo
Received input: foo
Timeout

Importantly, I see that asyncInvoke is only called twice, because by the time 
the third invocation is due to occur, the timeout has already kicked in and 
marked this record as handled.

However the above is clearly unrealistic as it calls 
resultFuture.completeExceptionally immediately rather than asynchronously after 
some work as taken place. So now I replace the asyncInvoke implementation above 
with the following:


override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) {
    println("Received input: $input")
    CompletableFuture.supplyAsync {
        Thread.sleep(500)
        resultFuture.completeExceptionally(Exception("Error from inside 
CompletableFuture"))
    }
}

Now I get output which I don't expect, which shows that after the timeout, 
asyncInvoke continues to be called a few more times.

That seems wrong to me: shouldn't it stop being called because timeout has 
already been invoked and it called resultFuture.complete()?

I might well just be misunderstanding something here.

Thanks in advance,

Yoni.

Reply via email to