Hi Lincoln, Thanks very much for the reply! The issue seems to occur both in local development in the IDE, and when running in a Flink cluster. Below is the full Java code to replicate the issue. I generated an empty project following the instructions at https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/datastream/, namely I ran this:
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-walkthrough-datastream-java \ -DarchetypeVersion=1.16.0 \ -DgroupId=timeoutretry \ -DartifactId=timeoutretry \ -Dversion=0.1 \ -Dpackage=timeoutretry \ -DinteractiveMode=false I then deleted the two generated files and created one called TimeoutRetry.java, with the code below. Here I've created a dummy source that simply emits one value, for simplicity. (Note that I first came across the issue when working with a Kinesis source, but in order to rule Kinesis out of the equation I created the dummy source instead.) Then I added an async function which I've hard-coded to wait 500ms then fail. package timeoutretry; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; import org.apache.flink.streaming.util.retryable.RetryPredicates; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class TimeoutRetry { private static void log(String message) { System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_TIME) + " :: " + message); } public static class SingleValueSource extends RichSourceFunction<String> { private volatile boolean cancelled = false; private volatile boolean alreadySentValue = false; @Override public void run(SourceContext<String> ctx) { while (!cancelled) { synchronized (ctx.getCheckpointLock()) { if (!alreadySentValue) { ctx.collect("foo"); alreadySentValue = true; } } } } @Override public void cancel() { cancelled = true; } } public static class ExampleRichAsyncFunction extends RichAsyncFunction<String, String> { @Override public void asyncInvoke(String input, ResultFuture<String> resultFuture) { log("Received " + input); // resultFuture.completeExceptionally(new Exception("Dummy error")); // The line above gets expected output: // 09:39:50.668851 :: Received foo // 09:39:52.671624 :: Received foo // 09:39:54.671417 :: Timed out handling fooo CompletableFuture.runAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } resultFuture.completeExceptionally(new Exception("Dummy error")); }); // The block above gets unexpected output: // 09:57:01.574928 :: Received foo // 09:57:04.084659 :: Received foo // 09:57:05.581016 :: Timed out handling foo // 09:57:06.590309 :: Received foo // 09:57:09.099132 :: Received foo // 09:57:11.605754 :: Received foo // 09:57:14.114028 :: Received foo // This will keep going for as long as the number of maxAttempts set for the // AsyncRetryStrategies.FixedDelayRetryStrategyBuilder. } @Override public void timeout(String input, ResultFuture<String> resultFuture) { log("Timed out handling " + input); resultFuture.complete(Collections.emptyList()); } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> source = env .addSource(new SingleValueSource()) .name("single-value"); AsyncDataStream .unorderedWaitWithRetry( source, new ExampleRichAsyncFunction(), 4, TimeUnit.SECONDS, 100, new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(10, 2000) .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build() ) .print(); env.execute("Timeout Retry Example"); } } I hope that gives you the information you need? Let me know if there's anything else I can to help troubleshoot. I'm still pretty new to Flink, so I might well just have misunderstood something or set something up incorrectly. Thanks in advance, Yoni. ________________________________ From: Lincoln Lee <lincoln.8...@gmail.com> Sent: 14 December 2022 13:51 To: Yoni Gibbs <yonigi...@hotmail.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: AsyncDataStream: Retries keep executing after timeout hi, Is this case running like a it case locally, or a streaming job running on a cluster? If it's the former, one thing I can think of is local testing using bounded datasource(has few test records) that will end input very fastly and then trigger the endOfInput logic of AsyncWaitOperator, that is it finishes all in fight delayed retry items immediately(asyncInvoke will be called as the last attempt before the operator exits and as the final result, regardless of whether it has timed out or not), this may be one more attempt than when the job does not end in normal running. For a long running job, the retry will start from stratch when job recover from restart(regardless of how many times it has been retried before), this may also result more attempts and longer time for retry elements. If you can provide more information about the test, maybe we can further clarify what the problem is. Best, Lincoln Lee Yoni Gibbs <yonigi...@hotmail.com<mailto:yonigi...@hotmail.com>> 于2022年12月13日周二 23:46写道: Hi, I've got a Kinesis consumer which reacts to each record by doing some async work using an implementation of RichAsyncFunction. I'm adding a retry strategy. After x failed attempts I want this to time out and give up by returning no data (i.e. not be treated as a failure). Here is a cut down version of my code, which works as expected (in Kotlin, I hope that's OK - can supply Java translation if required): val targetStream = AsyncDataStream .unorderedWaitWithRetry( inputStream, object : RichAsyncFunction<String, String>() { override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) { println("Received input: $input") resultFuture.completeExceptionally(Exception("Error from inside CompletableFuture")) } override fun timeout(input: String, resultFuture: ResultFuture<String>) { println("Timeout") resultFuture.complete(listOf()) } }, 4, TimeUnit.SECONDS, 100, AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000) .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build() ) This will time out after 4 seconds, and the retry strategy is set to retry every two seconds. If I run that I get the output I expect, namely: Received input: foo Received input: foo Timeout Importantly, I see that asyncInvoke is only called twice, because by the time the third invocation is due to occur, the timeout has already kicked in and marked this record as handled. However the above is clearly unrealistic as it calls resultFuture.completeExceptionally immediately rather than asynchronously after some work as taken place. So now I replace the asyncInvoke implementation above with the following: override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) { println("Received input: $input") CompletableFuture.supplyAsync { Thread.sleep(500) resultFuture.completeExceptionally(Exception("Error from inside CompletableFuture")) } } Now I get output which I don't expect, which shows that after the timeout, asyncInvoke continues to be called a few more times. That seems wrong to me: shouldn't it stop being called because timeout has already been invoked and it called resultFuture.complete()? I might well just be misunderstanding something here. Thanks in advance, Yoni.