Re: Unaligned checkpoint blocked by long Async operation

2024-03-17 Thread Zakelly Lan
I agree. Also create https://issues.apache.org/jira/browse/FLINK-34704 for
tracking and further discussion.


Best,
Zakelly

On Fri, Mar 15, 2024 at 2:59 PM Gyula Fóra  wrote:

> Posting this to dev as well...
>
> Thanks Zakelly,
> Sounds like a solution could be to add a new different version of yield
> that would actually yield to the checkpoint barrier too. That way operator
> implementations could decide whether any state modification may or may not
> have happened and can optionally allow checkpoint to be taken in the
> "middle of record  processing".
>
> Gyula
>
> On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan  wrote:
>
>> Hi Gyula,
>>
>> Processing checkpoint halfway through `processElement` is problematic.
>> The current element will not be included in the input in-flight data, and
>> we cannot assume it has taken effect on the state by user code. So the best
>> way is to treat `processElement` as an 'atomic' operation. I guess that's
>> why the priority of the cp barrier is set low.
>> However, the AsyncWaitOperator is a special case where we know the
>> element blocked at `addToWorkQueue` has not started triggering the
>> userFunction. Thus I'd suggest putting the element in the queue when the cp
>> barrier comes, and taking a snapshot of the whole queue afterwards. The
>> problem will be solved. But this approach also involves some code
>> modifications on the mailbox executor.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:
>>
>>> Thank you for the detailed analysis Zakelly.
>>>
>>> I think we should consider whether yield should process checkpoint
>>> barriers because this puts quite a serious limitation on the unaligned
>>> checkpoints in these cases.
>>> Do you know what is the reason behind the current priority setting? Is
>>> there a problem with processing the barrier here?
>>>
>>> Gyula
>>>
>>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan 
>>> wrote:
>>>
 Hi Gyula,

 Well I tried your example in local mini-cluster, and it seems the
 source can take checkpoints but it will block in the following
 AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until
 the current `processElement` finishes its execution. In your example, the
 element queue of `AsyncWaitOperator` will end up full and `processElement`
 will be blocked at `addToWorkQueue`. Even though it will call
 `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
 unprocessed since the priority of the barrier is -1, lower than the one
 `yield()` should handle. I verified this using single-step debugging.

 And if one element could finish its async io, the cp barrier can be
 processed afterwards. For example:
 ```
 env.getCheckpointConfig().enableUnalignedCheckpoints();
 env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
 env.getConfig().setParallelism(1);
 AsyncDataStream.orderedWait(
 env.fromSequence(Long.MIN_VALUE,
 Long.MAX_VALUE).shuffle(),
 new AsyncFunction() {
 boolean first = true;
 @Override
 public void asyncInvoke(Long aLong,
 ResultFuture resultFuture) {
 if (first) {

 Executors.newSingleThreadExecutor().execute(() -> {
 try {
 Thread.sleep(2); // process
 after 20s, only for the first one.
 } catch (Throwable e) {}
 LOG.info("Complete one");

 resultFuture.complete(Collections.singleton(1L));
 });
 first = false;
 }
 }
 },
 24,
 TimeUnit.HOURS,
 1)
 .print();
 ```
 The checkpoint 1 can be normally finished after the "Complete one" log
 print.

 I guess the users have no means to solve this problem, we might
 optimize this later.


 Best,
 Zakelly

 On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra 
 wrote:

> Hey all!
>
> I encountered a strange and unexpected behaviour when trying to use
> unaligned checkpoints with AsyncIO.
>
> If the async operation queue is full and backpressures the pipeline
> completely, then unaligned checkpoints cannot be completed. To me this
> sounds counterintuitive because one of the benefits of the AsyncIO would 
> be
> that we can simply checkpoint the queue and not have to wait for the
> completion.
>
> To repro you can simply run:
>
> AsyncDataStream.orderedWait(
> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
> new AsyncFunction() {
> @Override
> 

Re: Unaligned checkpoint blocked by long Async operation

2024-03-15 Thread Gyula Fóra
Posting this to dev as well...

Thanks Zakelly,
Sounds like a solution could be to add a new different version of yield
that would actually yield to the checkpoint barrier too. That way operator
implementations could decide whether any state modification may or may not
have happened and can optionally allow checkpoint to be taken in the
"middle of record  processing".

Gyula

On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan  wrote:

> Hi Gyula,
>
> Processing checkpoint halfway through `processElement` is problematic. The
> current element will not be included in the input in-flight data, and we
> cannot assume it has taken effect on the state by user code. So the best
> way is to treat `processElement` as an 'atomic' operation. I guess that's
> why the priority of the cp barrier is set low.
> However, the AsyncWaitOperator is a special case where we know the element
> blocked at `addToWorkQueue` has not started triggering the userFunction.
> Thus I'd suggest putting the element in the queue when the cp barrier
> comes, and taking a snapshot of the whole queue afterwards. The problem
> will be solved. But this approach also involves some code modifications on
> the mailbox executor.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:
>
>> Thank you for the detailed analysis Zakelly.
>>
>> I think we should consider whether yield should process checkpoint
>> barriers because this puts quite a serious limitation on the unaligned
>> checkpoints in these cases.
>> Do you know what is the reason behind the current priority setting? Is
>> there a problem with processing the barrier here?
>>
>> Gyula
>>
>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Well I tried your example in local mini-cluster, and it seems the source
>>> can take checkpoints but it will block in the following AsyncWaitOperator.
>>> IIUC, the unaligned checkpoint barrier should wait until the current
>>> `processElement` finishes its execution. In your example, the element queue
>>> of `AsyncWaitOperator` will end up full and `processElement` will be
>>> blocked at `addToWorkQueue`. Even though it will call
>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>> unprocessed since the priority of the barrier is -1, lower than the one
>>> `yield()` should handle. I verified this using single-step debugging.
>>>
>>> And if one element could finish its async io, the cp barrier can be
>>> processed afterwards. For example:
>>> ```
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>>> env.getConfig().setParallelism(1);
>>> AsyncDataStream.orderedWait(
>>> env.fromSequence(Long.MIN_VALUE,
>>> Long.MAX_VALUE).shuffle(),
>>> new AsyncFunction() {
>>> boolean first = true;
>>> @Override
>>> public void asyncInvoke(Long aLong,
>>> ResultFuture resultFuture) {
>>> if (first) {
>>>
>>> Executors.newSingleThreadExecutor().execute(() -> {
>>> try {
>>> Thread.sleep(2); // process
>>> after 20s, only for the first one.
>>> } catch (Throwable e) {}
>>> LOG.info("Complete one");
>>>
>>> resultFuture.complete(Collections.singleton(1L));
>>> });
>>> first = false;
>>> }
>>> }
>>> },
>>> 24,
>>> TimeUnit.HOURS,
>>> 1)
>>> .print();
>>> ```
>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>> print.
>>>
>>> I guess the users have no means to solve this problem, we might optimize
>>> this later.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>>>
 Hey all!

 I encountered a strange and unexpected behaviour when trying to use
 unaligned checkpoints with AsyncIO.

 If the async operation queue is full and backpressures the pipeline
 completely, then unaligned checkpoints cannot be completed. To me this
 sounds counterintuitive because one of the benefits of the AsyncIO would be
 that we can simply checkpoint the queue and not have to wait for the
 completion.

 To repro you can simply run:

 AsyncDataStream.orderedWait(
 env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
 new AsyncFunction() {
 @Override
 public void asyncInvoke(Long aLong, ResultFuture
 resultFuture) {}
 },
 24,
 TimeUnit.HOURS,
 1)
 .print();

 This pipeline will completely backpressure the source and checkpoints
 do not progress even though they are unaligned. Already the source cannot
 take a 

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula,

Processing checkpoint halfway through `processElement` is problematic. The
current element will not be included in the input in-flight data, and we
cannot assume it has taken effect on the state by user code. So the best
way is to treat `processElement` as an 'atomic' operation. I guess that's
why the priority of the cp barrier is set low.
However, the AsyncWaitOperator is a special case where we know the element
blocked at `addToWorkQueue` has not started triggering the userFunction.
Thus I'd suggest putting the element in the queue when the cp barrier
comes, and taking a snapshot of the whole queue afterwards. The problem
will be solved. But this approach also involves some code modifications on
the mailbox executor.


Best,
Zakelly

On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:

> Thank you for the detailed analysis Zakelly.
>
> I think we should consider whether yield should process checkpoint
> barriers because this puts quite a serious limitation on the unaligned
> checkpoints in these cases.
> Do you know what is the reason behind the current priority setting? Is
> there a problem with processing the barrier here?
>
> Gyula
>
> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan  wrote:
>
>> Hi Gyula,
>>
>> Well I tried your example in local mini-cluster, and it seems the source
>> can take checkpoints but it will block in the following AsyncWaitOperator.
>> IIUC, the unaligned checkpoint barrier should wait until the current
>> `processElement` finishes its execution. In your example, the element queue
>> of `AsyncWaitOperator` will end up full and `processElement` will be
>> blocked at `addToWorkQueue`. Even though it will call
>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>> unprocessed since the priority of the barrier is -1, lower than the one
>> `yield()` should handle. I verified this using single-step debugging.
>>
>> And if one element could finish its async io, the cp barrier can be
>> processed afterwards. For example:
>> ```
>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>> env.getConfig().setParallelism(1);
>> AsyncDataStream.orderedWait(
>> env.fromSequence(Long.MIN_VALUE,
>> Long.MAX_VALUE).shuffle(),
>> new AsyncFunction() {
>> boolean first = true;
>> @Override
>> public void asyncInvoke(Long aLong,
>> ResultFuture resultFuture) {
>> if (first) {
>>
>> Executors.newSingleThreadExecutor().execute(() -> {
>> try {
>> Thread.sleep(2); // process after
>> 20s, only for the first one.
>> } catch (Throwable e) {}
>> LOG.info("Complete one");
>>
>> resultFuture.complete(Collections.singleton(1L));
>> });
>> first = false;
>> }
>> }
>> },
>> 24,
>> TimeUnit.HOURS,
>> 1)
>> .print();
>> ```
>> The checkpoint 1 can be normally finished after the "Complete one" log
>> print.
>>
>> I guess the users have no means to solve this problem, we might optimize
>> this later.
>>
>>
>> Best,
>> Zakelly
>>
>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>>
>>> Hey all!
>>>
>>> I encountered a strange and unexpected behaviour when trying to use
>>> unaligned checkpoints with AsyncIO.
>>>
>>> If the async operation queue is full and backpressures the pipeline
>>> completely, then unaligned checkpoints cannot be completed. To me this
>>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>>> that we can simply checkpoint the queue and not have to wait for the
>>> completion.
>>>
>>> To repro you can simply run:
>>>
>>> AsyncDataStream.orderedWait(
>>> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>>> new AsyncFunction() {
>>> @Override
>>> public void asyncInvoke(Long aLong, ResultFuture
>>> resultFuture) {}
>>> },
>>> 24,
>>> TimeUnit.HOURS,
>>> 1)
>>> .print();
>>>
>>> This pipeline will completely backpressure the source and checkpoints do
>>> not progress even though they are unaligned. Already the source cannot take
>>> a checkpoint it seems which for me is surprising because this is using the
>>> new source interface.
>>>
>>> Does anyone know why this happens and if there may be a solution?
>>>
>>> Thanks
>>> Gyula
>>>
>>


Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Gyula Fóra
Thank you for the detailed analysis Zakelly.

I think we should consider whether yield should process checkpoint barriers
because this puts quite a serious limitation on the unaligned checkpoints
in these cases.
Do you know what is the reason behind the current priority setting? Is
there a problem with processing the barrier here?

Gyula

On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan  wrote:

> Hi Gyula,
>
> Well I tried your example in local mini-cluster, and it seems the source
> can take checkpoints but it will block in the following AsyncWaitOperator.
> IIUC, the unaligned checkpoint barrier should wait until the current
> `processElement` finishes its execution. In your example, the element queue
> of `AsyncWaitOperator` will end up full and `processElement` will be
> blocked at `addToWorkQueue`. Even though it will call
> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
> unprocessed since the priority of the barrier is -1, lower than the one
> `yield()` should handle. I verified this using single-step debugging.
>
> And if one element could finish its async io, the cp barrier can be
> processed afterwards. For example:
> ```
> env.getCheckpointConfig().enableUnalignedCheckpoints();
> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
> env.getConfig().setParallelism(1);
> AsyncDataStream.orderedWait(
> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
> new AsyncFunction() {
> boolean first = true;
> @Override
> public void asyncInvoke(Long aLong, ResultFuture
> resultFuture) {
> if (first) {
> Executors.newSingleThreadExecutor().execute(()
> -> {
> try {
> Thread.sleep(2); // process after
> 20s, only for the first one.
> } catch (Throwable e) {}
> LOG.info("Complete one");
>
> resultFuture.complete(Collections.singleton(1L));
> });
> first = false;
> }
> }
> },
> 24,
> TimeUnit.HOURS,
> 1)
> .print();
> ```
> The checkpoint 1 can be normally finished after the "Complete one" log
> print.
>
> I guess the users have no means to solve this problem, we might optimize
> this later.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>
>> Hey all!
>>
>> I encountered a strange and unexpected behaviour when trying to use
>> unaligned checkpoints with AsyncIO.
>>
>> If the async operation queue is full and backpressures the pipeline
>> completely, then unaligned checkpoints cannot be completed. To me this
>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>> that we can simply checkpoint the queue and not have to wait for the
>> completion.
>>
>> To repro you can simply run:
>>
>> AsyncDataStream.orderedWait(
>> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>> new AsyncFunction() {
>> @Override
>> public void asyncInvoke(Long aLong, ResultFuture
>> resultFuture) {}
>> },
>> 24,
>> TimeUnit.HOURS,
>> 1)
>> .print();
>>
>> This pipeline will completely backpressure the source and checkpoints do
>> not progress even though they are unaligned. Already the source cannot take
>> a checkpoint it seems which for me is surprising because this is using the
>> new source interface.
>>
>> Does anyone know why this happens and if there may be a solution?
>>
>> Thanks
>> Gyula
>>
>


Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Zakelly Lan
Hi Gyula,

Well I tried your example in local mini-cluster, and it seems the source
can take checkpoints but it will block in the following AsyncWaitOperator.
IIUC, the unaligned checkpoint barrier should wait until the current
`processElement` finishes its execution. In your example, the element queue
of `AsyncWaitOperator` will end up full and `processElement` will be
blocked at `addToWorkQueue`. Even though it will call
`mailboxExecutor.yield();`, it still leaves the checkpoint barrier
unprocessed since the priority of the barrier is -1, lower than the one
`yield()` should handle. I verified this using single-step debugging.

And if one element could finish its async io, the cp barrier can be
processed afterwards. For example:
```
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
env.getConfig().setParallelism(1);
AsyncDataStream.orderedWait(
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
new AsyncFunction() {
boolean first = true;
@Override
public void asyncInvoke(Long aLong, ResultFuture
resultFuture) {
if (first) {
Executors.newSingleThreadExecutor().execute(()
-> {
try {
Thread.sleep(2); // process after
20s, only for the first one.
} catch (Throwable e) {}
LOG.info("Complete one");

resultFuture.complete(Collections.singleton(1L));
});
first = false;
}
}
},
24,
TimeUnit.HOURS,
1)
.print();
```
The checkpoint 1 can be normally finished after the "Complete one" log
print.

I guess the users have no means to solve this problem, we might optimize
this later.


Best,
Zakelly

On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:

> Hey all!
>
> I encountered a strange and unexpected behaviour when trying to use
> unaligned checkpoints with AsyncIO.
>
> If the async operation queue is full and backpressures the pipeline
> completely, then unaligned checkpoints cannot be completed. To me this
> sounds counterintuitive because one of the benefits of the AsyncIO would be
> that we can simply checkpoint the queue and not have to wait for the
> completion.
>
> To repro you can simply run:
>
> AsyncDataStream.orderedWait(
> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
> new AsyncFunction() {
> @Override
> public void asyncInvoke(Long aLong, ResultFuture
> resultFuture) {}
> },
> 24,
> TimeUnit.HOURS,
> 1)
> .print();
>
> This pipeline will completely backpressure the source and checkpoints do
> not progress even though they are unaligned. Already the source cannot take
> a checkpoint it seems which for me is surprising because this is using the
> new source interface.
>
> Does anyone know why this happens and if there may be a solution?
>
> Thanks
> Gyula
>


Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Gyula Fóra
Hey all!

I encountered a strange and unexpected behaviour when trying to use
unaligned checkpoints with AsyncIO.

If the async operation queue is full and backpressures the pipeline
completely, then unaligned checkpoints cannot be completed. To me this
sounds counterintuitive because one of the benefits of the AsyncIO would be
that we can simply checkpoint the queue and not have to wait for the
completion.

To repro you can simply run:

AsyncDataStream.orderedWait(
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
new AsyncFunction() {
@Override
public void asyncInvoke(Long aLong, ResultFuture
resultFuture) {}
},
24,
TimeUnit.HOURS,
1)
.print();

This pipeline will completely backpressure the source and checkpoints do
not progress even though they are unaligned. Already the source cannot take
a checkpoint it seems which for me is surprising because this is using the
new source interface.

Does anyone know why this happens and if there may be a solution?

Thanks
Gyula