[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170641#comment-17170641 ] Piotr Nowojski commented on FLINK-18748: Thanks for the contribution [~wayland] and for the reviews [~klion26] [~roman_khachatryan] > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Assignee: tao wang >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.2 > > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170632#comment-17170632 ] Piotr Nowojski commented on FLINK-18748: merged to master as e422d42e09^ and e422d42e09 > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Assignee: tao wang >Priority: Major > Labels: pull-request-available > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169597#comment-17169597 ] tao wang commented on FLINK-18748: -- Hi, [~klion26] [~roman_khachatryan]: I have create a pull_request, would you like to review? thanks. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Assignee: tao wang >Priority: Major > Labels: pull-request-available > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168985#comment-17168985 ] Roman Khachatryan commented on FLINK-18748: --- Exactly, thanks for the clarification [~wayland]. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Assignee: tao wang >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168655#comment-17168655 ] tao wang commented on FLINK-18748: -- [~roman_khachatryan] thanks for your advice, I will skip nextTriggerDelayMillis. Maybe what you want to say is: skip this call if the request is forced or is not periodic. I will return Optional.of(queuedRequests.pollFirst()) immediately if the queuedRequests.first() is forced or is not periodic. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Assignee: tao wang >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168555#comment-17168555 ] Roman Khachatryan commented on FLINK-18748: --- Thanks for your analysis and proposal [~wayland], I think nextTriggerDelayMillis might actually be quite heavy, as it involves a system call. Why not skip this call at all if the request is periodic (and not forced) in chooseRequestToExecute? > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Assignee: tao wang >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168415#comment-17168415 ] tao wang commented on FLINK-18748: -- [~klion26] Ok, I have read the doc, thanks. I have some thoughts on this issue: If pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts, we'll skip step 2, and if nextTriggerDelayMillis > 0, onTooEarly will be called. {code:java} private Optional onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); // first.isForce() will be false if running in unaligned checkpoints mode if (first.isForce()) { return Optional.of(queuedRequests.pollFirst()); } else if (first.isPeriodic) { // first.isPeriodic will be false too if the savepoint is trigger by user queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); return Optional.empty(); } else { // we will get Optional.empty() return Optional.empty(); } }{code} In this case, the onTooEarly function will return Optional.empty() if a user triggers a savepoint to early while the job is running in unaligned checkpoints mode. I think this is the reason why some savepoint gets stuck at IN_PROGRESS state. Maybe I'll change onTooEarly to this: {code:java} private Optional onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); if (first.isPeriodic) { queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); return Optional.empty(); } return Optional.of(queuedRequests.pollFirst()); } {code} By the way, I think nextTriggerDelayMillis will not wait any time, so there is no necessary to change any code in chooseRequestToExecute. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17168357#comment-17168357 ] Congxian Qiu(klion26) commented on FLINK-18748: --- [~roman_khachatryan] could you please help to assign this ticket to [~wayland], thanks. [~wayland] Please make sure you have read the [contribute documentation|https://flink.apache.org/contributing/how-to-contribute.html] before implementation. and you can ping us for review after filed a PR. thanks. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167917#comment-17167917 ] tao wang commented on FLINK-18748: -- [~klion26] Yes, I'd like to fix this. It's my pleasure. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167912#comment-17167912 ] Congxian Qiu(klion26) commented on FLINK-18748: --- [~wayland] Do you want to fix this problem? > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167888#comment-17167888 ] Roman Khachatryan commented on FLINK-18748: --- I see your point [~klion26]: in UC mode, savepoint can wait unnecessarily. I think this can be easily fixed by calling onTooEarly only for periodic requests. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167719#comment-17167719 ] Roman Khachatryan commented on FLINK-18748: --- Thanks for the clarification [~klion26], in onTooEarly there is a check too, that if it's a forced request, it will be executed immediately: {code:java} private Optional onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); if (first.isForce()) { return Optional.of(queuedRequests.pollFirst()); {code} > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected if pendingCheckpoints less than maxConcurrentCheckpoints
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167598#comment-17167598 ] Congxian Qiu(klion26) commented on FLINK-18748: --- Hi [~roman_khachatryan] thanks for your reply. If {{pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts}} , then we would not do step 2, but if {{pendingCheckpointSizeSupplier.get() < maxConcurrentCheckpointAttempts}} we'll skip case 2, just check the min pause time in case 3. but in this case, I think the savepoint should be triggered. If we have {{maxConcurrentCheckpointAttempts}} == 1, and now there is no ongoing checkpoint/savepoint. the user triggers a savepoint, then we will skip case 2 in the description. in case 3 we just check the min pause time, so we'll wait some time. > Savepoint would be queued unexpected if pendingCheckpoints less than > maxConcurrentCheckpoints > - > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166946#comment-17166946 ] Roman Khachatryan commented on FLINK-18748: --- Thanks [~klion26], >From the code above I see in the mentioned case (2): .filter(CheckpointTriggerRequest::isForce) which means that forced request *will* be triggered. As savepoints *are* forced (unless running in unaligned checkpoints) this means that savepoint request *will* be triggered, even if pendingCheckpoints >= maxConcurrentCheckpoins. > Savepoint would be queued unexpected > > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Inspired by a [user-zh > email|http://apache-flink.147419.n8.nabble.com/flink-1-11-rest-api-saveppoint-td5497.html] > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18748) Savepoint would be queued unexpected
[ https://issues.apache.org/jira/browse/FLINK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166877#comment-17166877 ] Congxian Qiu(klion26) commented on FLINK-18748: --- [~pnowojski] [~roman_khachatryan] What do you think about this problem, If this is valid, I can help to fix it. > Savepoint would be queued unexpected > > > Key: FLINK-18748 > URL: https://issues.apache.org/jira/browse/FLINK-18748 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.0, 1.11.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > After FLINK-17342, when triggering a checkpoint/savepoint, we'll check > whether the request can be triggered in > {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: > {code:java} > Preconditions.checkState(Thread.holdsLock(lock)); > // 1. > if (isTriggering || queuedRequests.isEmpty()) { >return Optional.empty(); > } > // 2 too many ongoing checkpoitn/savepoint > if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { >return Optional.of(queuedRequests.first()) > .filter(CheckpointTriggerRequest::isForce) > .map(unused -> queuedRequests.pollFirst()); > } > // 3 check the timestamp of last complete checkpoint > long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); > if (nextTriggerDelayMillis > 0) { >return onTooEarly(nextTriggerDelayMillis); > } > return Optional.of(queuedRequests.pollFirst()); > {code} > But if currently {{pendingCheckpointsSizeSupplier.get()}} < > {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the > savepoint will still wait some time in step 3. > I think we should trigger the savepoint immediately if > {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)