Thanks for raising the concern @shuyi and the explanation @konstantin.

Upon glancing on the Flink document, it seems like user have full control
on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not
straightforward to access the internal state of the operator to, for
example, put the message back to the async buffer with a retry tag. Thus, I
also think that giving a set of common timeout handling seems to be a good
idea for Flink users and this could be very useful feature.

Regarding the questions and concerns
1. should the "retry counter" to be reset or to continue where it left off?
- This is definitely a good point as this counter might need to go into the
operator state if we decided to carry over the retry counter.
Functionality-wise I think this should be reset because it doesn't
represent the same transient state at the time of failure once restart.

2. When should AsyncStream.orderedWait() skip a record?
- This should be configurable by user I am assuming, for example we can
have additional properties for each strategy described by @shuyi like a
combination of:
  - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)

I've also created a JIRA ticket [2] for the discussion, please feel free to
share your thoughts and comments.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
[2] https://issues.apache.org/jira/browse/FLINK-11909



On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <konstan...@ververica.com>
wrote:

> Hi Shuyi,
>
> I am not sure. You could handle retries in the user code within
> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke
> without using a DLQ as described in my original answer to William.  On the
> other hand, I agree that it could easier for the user and it is indeed a
> common scenario.
>
> Two follow up questions come to mind:
>
>    - When a Flink job fails and restarts, would you expect the "retry
>    counter" to be reset or to continue where it left off?
>    - With AsyncStream.orderedWait() when would you expect a record to be
>    skipped? After the final timeout, after the first timeout?
>
> Would you like to create a JIRA ticket [1] for this improvement with
> answers to the questions above and we can continue to discuss it there.
>
> Best,
>
> Konstantin
>
> [1]
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues
>
>
> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <suez1...@gmail.com> wrote:
>
>> Hi Konstantin,
>>
>> (cc Till since he owns the code)
>>
>> For async-IO, IO failure and retry is a common & expected pattern. In
>> most of the use cases, users will need to deal with IO failure and retry.
>> Therefore, I think it's better to address the problem in Flink rather than
>> user implementing its custom logic in user code for a better dev
>> experience. We do have similar problem in many of our use cases. To enable
>> backoff and retry, we need to put the failed message to a DLQ (another
>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which
>> is manual/cumbersome and require setting up extra Kafka topic.
>>
>> Can we add multiple strategies to handle async IO failure in the
>> AsyncWaitOperator? I propose the following strategies:
>>
>>
>>    - FAIL_OPERATOR (default & current behavior)
>>    - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N
>>    times)
>>    - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
>>
>> What do you guys think? Thanks a lot.
>>
>> Shuyi
>>
>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <konstan...@ververica.com>
>> wrote:
>>
>>> Hi William,
>>>
>>> the AsyncOperator does not have such a setting. It is "merely" a wrapper
>>> around an asynchronous call, which provides integration with Flink's state
>>> & time management.
>>>
>>> I think, the way to go would be to do the exponential back-off in the
>>> user code and set the timeout of the AsyncOperator to the sum of the
>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s).
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <will...@saar.se> wrote:
>>>
>>>> Hi,
>>>> Is there a way to specify an exponential backoff strategy for when
>>>> async function calls fail?
>>>>
>>>> I have an async function that does web requests to a rate-limited API.
>>>> Can you handle that with settings on the async function call?
>>>>
>>>> Thanks,
>>>> William
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply via email to