Posting this to dev as well...

Thanks Zakelly,
Sounds like a solution could be to add a new different version of yield
that would actually yield to the checkpoint barrier too. That way operator
implementations could decide whether any state modification may or may not
have happened and can optionally allow checkpoint to be taken in the
"middle of record  processing".

Gyula

On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Gyula,
>
> Processing checkpoint halfway through `processElement` is problematic. The
> current element will not be included in the input in-flight data, and we
> cannot assume it has taken effect on the state by user code. So the best
> way is to treat `processElement` as an 'atomic' operation. I guess that's
> why the priority of the cp barrier is set low.
> However, the AsyncWaitOperator is a special case where we know the element
> blocked at `addToWorkQueue` has not started triggering the userFunction.
> Thus I'd suggest putting the element in the queue when the cp barrier
> comes, and taking a snapshot of the whole queue afterwards. The problem
> will be solved. But this approach also involves some code modifications on
> the mailbox executor.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Thank you for the detailed analysis Zakelly.
>>
>> I think we should consider whether yield should process checkpoint
>> barriers because this puts quite a serious limitation on the unaligned
>> checkpoints in these cases.
>> Do you know what is the reason behind the current priority setting? Is
>> there a problem with processing the barrier here?
>>
>> Gyula
>>
>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan <zakelly....@gmail.com>
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Well I tried your example in local mini-cluster, and it seems the source
>>> can take checkpoints but it will block in the following AsyncWaitOperator.
>>> IIUC, the unaligned checkpoint barrier should wait until the current
>>> `processElement` finishes its execution. In your example, the element queue
>>> of `AsyncWaitOperator` will end up full and `processElement` will be
>>> blocked at `addToWorkQueue`. Even though it will call
>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>> unprocessed since the priority of the barrier is -1, lower than the one
>>> `yield()` should handle. I verified this using single-step debugging.
>>>
>>> And if one element could finish its async io, the cp barrier can be
>>> processed afterwards. For example:
>>> ```
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>> env.getCheckpointConfig().setCheckpointInterval(10000);  // 10s interval
>>> env.getConfig().setParallelism(1);
>>> AsyncDataStream.orderedWait(
>>>                 env.fromSequence(Long.MIN_VALUE,
>>> Long.MAX_VALUE).shuffle(),
>>>                 new AsyncFunction<Long, Long>() {
>>>                     boolean first = true;
>>>                     @Override
>>>                     public void asyncInvoke(Long aLong,
>>> ResultFuture<Long> resultFuture) {
>>>                         if (first) {
>>>
>>> Executors.newSingleThreadExecutor().execute(() -> {
>>>                                 try {
>>>                                     Thread.sleep(20000); // process
>>> after 20s, only for the first one.
>>>                                 } catch (Throwable e) {}
>>>                                 LOG.info("Complete one");
>>>
>>> resultFuture.complete(Collections.singleton(1L));
>>>                             });
>>>                             first = false;
>>>                         }
>>>                     }
>>>                 },
>>>                 24,
>>>                 TimeUnit.HOURS,
>>>                 1)
>>>         .print();
>>> ```
>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>> print.
>>>
>>> I guess the users have no means to solve this problem, we might optimize
>>> this later.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> Hey all!
>>>>
>>>> I encountered a strange and unexpected behaviour when trying to use
>>>> unaligned checkpoints with AsyncIO.
>>>>
>>>> If the async operation queue is full and backpressures the pipeline
>>>> completely, then unaligned checkpoints cannot be completed. To me this
>>>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>>>> that we can simply checkpoint the queue and not have to wait for the
>>>> completion.
>>>>
>>>> To repro you can simply run:
>>>>
>>>> AsyncDataStream.orderedWait(
>>>>     env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>>>>     new AsyncFunction<Long, Long>() {
>>>>         @Override
>>>>         public void asyncInvoke(Long aLong, ResultFuture<Long>
>>>> resultFuture) {}
>>>>     },
>>>>     24,
>>>>     TimeUnit.HOURS,
>>>>     1)
>>>>     .print();
>>>>
>>>> This pipeline will completely backpressure the source and checkpoints
>>>> do not progress even though they are unaligned. Already the source cannot
>>>> take a checkpoint it seems which for me is surprising because this is using
>>>> the new source interface.
>>>>
>>>> Does anyone know why this happens and if there may be a solution?
>>>>
>>>> Thanks
>>>> Gyula
>>>>
>>>

Reply via email to