[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-08-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170641#comment-17170641
 ] 

Piotr Nowojski commented on FLINK-18748:


Thanks for the contribution [~wayland] and for the reviews [~klion26] 
[~roman_khachatryan]

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Assignee: tao wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2
>
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-08-04 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170632#comment-17170632
 ] 

Piotr Nowojski commented on FLINK-18748:


merged to master as e422d42e09^ and e422d42e09

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Assignee: tao wang
>Priority: Major
>  Labels: pull-request-available
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-08-02 Thread tao wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169597#comment-17169597
 ] 

tao wang commented on FLINK-18748:
--

Hi, [~klion26] [~roman_khachatryan]: I have create a pull_request, would you 
like to review?

thanks.

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Assignee: tao wang
>Priority: Major
>  Labels: pull-request-available
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-31 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168985#comment-17168985
 ] 

Roman Khachatryan commented on FLINK-18748:
---

Exactly, thanks for the clarification [~wayland].

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Assignee: tao wang
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-31 Thread tao wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168655#comment-17168655
 ] 

tao wang commented on FLINK-18748:
--

[~roman_khachatryan] thanks for your advice, I will skip 
nextTriggerDelayMillis.  

Maybe what you want to say is:  skip this call if the request is forced or is 
not periodic.

I will return Optional.of(queuedRequests.pollFirst()) immediately if the 
queuedRequests.first() is forced or is not periodic.

 

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Assignee: tao wang
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-31 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168555#comment-17168555
 ] 

Roman Khachatryan commented on FLINK-18748:
---

Thanks for your analysis and proposal [~wayland],

I think nextTriggerDelayMillis might actually be quite heavy, as it involves a 
system call.

Why not skip this call at all if the request is periodic (and not forced) in 
chooseRequestToExecute?

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Assignee: tao wang
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-30 Thread tao wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168415#comment-17168415
 ] 

tao wang commented on FLINK-18748:
--

[~klion26] Ok, I have read the doc, thanks.

I have some thoughts on this issue:

If pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts, we'll 
skip step 2, and if nextTriggerDelayMillis > 0,  onTooEarly will be called.

 

 
{code:java}
private Optional onTooEarly(long 
nextTriggerDelayMillis) {
   CheckpointTriggerRequest first = queuedRequests.first();

   // first.isForce() will be false if running in unaligned checkpoints mode
   if (first.isForce()) {
  return Optional.of(queuedRequests.pollFirst());
   } else if (first.isPeriodic) {
   // first.isPeriodic will be false too if the savepoint is trigger by user

  queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
  rescheduleTrigger.accept(nextTriggerDelayMillis);
  return Optional.empty();
   } else {
   
   // we will get Optional.empty()
  return Optional.empty();
   }
}{code}
 

In this case, the onTooEarly function will return Optional.empty() if a user 
triggers a savepoint to early while the job is running in unaligned checkpoints 
mode.

I think this is the reason why some savepoint gets stuck at IN_PROGRESS state.

 

Maybe I'll change onTooEarly to this:

 
{code:java}

private Optional onTooEarly(long 
nextTriggerDelayMillis) {
   CheckpointTriggerRequest first = queuedRequests.first();
   if (first.isPeriodic) {
  queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
  rescheduleTrigger.accept(nextTriggerDelayMillis);
  return Optional.empty();
   } 
   return Optional.of(queuedRequests.pollFirst());
}

{code}
By the way, I think nextTriggerDelayMillis will not wait any time, so there is 
no necessary to change any code in chooseRequestToExecute.

 

 

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-30 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168357#comment-17168357
 ] 

Congxian Qiu(klion26) commented on FLINK-18748:
---

[~roman_khachatryan] could you please help to assign this ticket to [~wayland], 
thanks.
[~wayland] Please make sure you have read the [contribute 
documentation|https://flink.apache.org/contributing/how-to-contribute.html] 
before implementation. and you can ping us for review after filed a PR. thanks.

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-30 Thread tao wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167917#comment-17167917
 ] 

tao wang commented on FLINK-18748:
--

[~klion26] Yes, I'd like to fix this.   It's my pleasure.

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-30 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167912#comment-17167912
 ] 

Congxian Qiu(klion26) commented on FLINK-18748:
---

[~wayland] Do you want to fix this problem?

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-30 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167888#comment-17167888
 ] 

Roman Khachatryan commented on FLINK-18748:
---

I see your point [~klion26]: in UC mode, savepoint can wait unnecessarily.

I think this can be easily fixed by calling onTooEarly only for periodic 
requests.

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-30 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167719#comment-17167719
 ] 

Roman Khachatryan commented on FLINK-18748:
---

Thanks for the clarification [~klion26],

in onTooEarly there is a check too, that if it's a forced request, it will be 
executed immediately:
{code:java}
private Optional onTooEarly(long 
nextTriggerDelayMillis) {
CheckpointTriggerRequest first = queuedRequests.first();
if (first.isForce()) {
return Optional.of(queuedRequests.pollFirst());
{code}

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints

2020-07-29 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167598#comment-17167598
 ] 

Congxian Qiu(klion26) commented on FLINK-18748:
---

Hi [~roman_khachatryan] thanks for your reply. If 
{{pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts}} , 
then we would not do step 2, but if {{pendingCheckpointSizeSupplier.get() < 
maxConcurrentCheckpointAttempts}} we'll skip case 2, just check the min pause 
time in case 3. but in this case, I think the savepoint should be triggered.

If we have {{maxConcurrentCheckpointAttempts}} == 1, and now there is no 
ongoing checkpoint/savepoint. the user triggers a savepoint, then we will skip 
case 2 in the description. in case 3 we just check the min pause time, so we'll 
wait some time.

 

> Savepoint would be queued unexpected if pendingCheckpoints less than 
> maxConcurrentCheckpoints
> -
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected

2020-07-29 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166946#comment-17166946
 ] 

Roman Khachatryan commented on FLINK-18748:
---

Thanks [~klion26],

>From the code above I see in the mentioned case (2):

.filter(CheckpointTriggerRequest::isForce)

which means that forced request *will* be triggered.

As savepoints *are* forced (unless running in unaligned checkpoints) this means 
that savepoint request *will* be triggered, even if pendingCheckpoints >= 
maxConcurrentCheckpoins.

> Savepoint would be queued unexpected
> 
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by a [user-zh 
> email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html]
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected

2020-07-28 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166877#comment-17166877
 ] 

Congxian Qiu(klion26) commented on FLINK-18748:
---

[~pnowojski] [~roman_khachatryan] What do you think about this problem, If this 
is valid, I can help to fix it.

> Savepoint would be queued unexpected
> 
>
> Key: FLINK-18748
> URL: https://issues.apache.org/jira/browse/FLINK-18748
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> After FLINK-17342, when triggering a checkpoint/savepoint, we'll check 
> whether the request can be triggered in 
> {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow:
> {code:java}
> Preconditions.checkState(Thread.holdsLock(lock));
> // 1. 
> if (isTriggering || queuedRequests.isEmpty()) {
>return Optional.empty();
> }
> // 2 too many ongoing checkpoitn/savepoint
> if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) {
>return Optional.of(queuedRequests.first())
>   .filter(CheckpointTriggerRequest::isForce)
>   .map(unused -> queuedRequests.pollFirst());
> }
> // 3 check the timestamp of last complete checkpoint
> long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
> if (nextTriggerDelayMillis > 0) {
>return onTooEarly(nextTriggerDelayMillis);
> }
> return Optional.of(queuedRequests.pollFirst());
> {code}
> But if currently {{pendingCheckpointsSizeSupplier.get()}} < 
> {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the 
> savepoint will still wait some time in step 3. 
> I think we should trigger the savepoint immediately if 
> {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)