[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-507949361 Ping @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299785151 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 {% highlight java %} env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); {% endhighlight %} -### Difference to Savepoints +### Checkpoint与Savepoint的区别 Review comment: ```suggestion ### Checkpoint 与 Savepoint 的区别 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299784600 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 Review comment: ```suggestion - **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299785107 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 Review comment: ```suggestion 创建 state backend 对单个作业进行配置 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299785643 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 {% highlight java %} env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); {% endhighlight %} -### Difference to Savepoints +### Checkpoint与Savepoint的区别 -Checkpoints have a few differences from [savepoints](savepoints.html). They -- use a state backend specific (low-level) data format, may be incremental. -- do not support Flink specific features like rescaling. +checkpoint与[savepoints](savepoints.html)有一些区别,体现在他们: +- 使用特定于state backend(低级)的数据格式,可能以增量方式存储。 +- 不支持Flink的特定功能,比如扩缩容。 Review comment: ```suggestion - 不支持 Flink 的特定功能,比如扩缩容。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299783577 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint Review comment: ```suggestion ## 保留 Checkpoint ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299784378 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 Review comment: ```suggestion checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299784434 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: Review comment: ```suggestion `ExternalizedCheckpointCleanup` 配置项定义了当作业取消时,对作业 checkpoint 的操作: ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299785669 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 {% highlight java %} env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); {% endhighlight %} -### Difference to Savepoints +### Checkpoint与Savepoint的区别 -Checkpoints have a few differences from [savepoints](savepoints.html). They -- use a state backend specific (low-level) data format, may be incremental. -- do not support Flink specific features like rescaling. +checkpoint与[savepoints](savepoints.html)有一些区别,体现在他们: +- 使用特定于state backend(低级)的数据格式,可能以增量方式存储。 +- 不支持Flink的特定功能,比如扩缩容。 -### Resuming from a retained checkpoint +### 从checkpoint中恢复状态 Review comment: ```suggestion ### 从保留的 checkpoint 中恢复状态 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299784986 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 Review comment: ```suggestion 与 [savepoints](savepoints.html) 相似,checkpoint 由元数据文件、数据文件(与state backend 相关)组成。可通过配置文件中 “state.checkpoints.dir” 配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299783489 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 Review comment: ```suggestion checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299783547 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 Review comment: ```suggestion 参考 [Checkpointing]({{ site.baseurl }}/zh/dev/stream/state/checkpointing.html) 查看如何在 Flink 程序中开启和配置 checkpoint。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299785407 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 {% highlight java %} env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); {% endhighlight %} -### Difference to Savepoints +### Checkpoint与Savepoint的区别 -Checkpoints have a few differences from [savepoints](savepoints.html). They -- use a state backend specific (low-level) data format, may be incremental. -- do not support Flink specific features like rescaling. +checkpoint与[savepoints](savepoints.html)有一些区别,体现在他们: Review comment: ```suggestion checkpoint 与 [savepoints](savepoints.html) 有一些区别,体现在 checkpoint : ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299785612 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 {% highlight java %} env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); {% endhighlight %} -### Difference to Savepoints +### Checkpoint与Savepoint的区别 -Checkpoints have a few differences from [savepoints](savepoints.html). They -- use a state backend specific (low-level) data format, may be incremental. -- do not support Flink specific features like rescaling. +checkpoint与[savepoints](savepoints.html)有一些区别,体现在他们: +- 使用特定于state backend(低级)的数据格式,可能以增量方式存储。 Review comment: ```suggestion - 使用 state backend 特定的数据格式,可能以增量方式存储。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299784530 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 Review comment: ```suggestion - **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业的 checkpoint。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese
klion26 commented on a change in pull request #8943: [FLINK-11637][doc-zh]Translate Checkpoints page into Chinese URL: https://github.com/apache/flink/pull/8943#discussion_r299786000 ## File path: docs/ops/state/checkpoints.zh.md ## @@ -26,69 +26,49 @@ under the License. * toc {:toc} -## Overview +## 概述 +checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语义。 -Checkpoints make state in Flink fault tolerant by allowing state and the -corresponding stream positions to be recovered, thereby giving the application -the same semantics as a failure-free execution. +参考[Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html)查看如何在Flink程序中开启和配置checkpoint。 -See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for how to enable and -configure checkpoints for your program. +## 保留Checkpoint -## Retained Checkpoints - -Checkpoints are by default not retained and are only used to resume a -job from failures. They are deleted when a program is cancelled. -You can, however, configure periodic checkpoints to be retained. -Depending on the configuration these *retained* checkpoints are *not* -automatically cleaned up when the job fails or is canceled. -This way, you will have a checkpoint around to resume from if your job fails. +checkpoint在默认的情况下仅用于恢复失败的作业,并不作保留,当程序取消时checkpoint就会被删除。当然,你可以配置周期性的策略来保留checkpoint。根据以下配置,这些保留的checkpoint在作业失败或取消时不会自动清除。这样,你就可以使用该checkpoint来恢复失败的作业。 {% highlight java %} CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %} -The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job: - -- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case. - -- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. +`ExternalizedCheckpointCleanup`配置项定义了当作业取消时,对作业checkpoint的操作: +- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。 +- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被启用。 -### Directory Structure +### 目录结构 -Similarly to [savepoints](savepoints.html), a checkpoint consists -of a meta data file and, depending on the state backend, some additional data -files. The meta data file and data files are stored in the directory that is -configured via `state.checkpoints.dir` in the configuration files, -and also can be specified for per job in the code. +与[savepoints](savepoints.html)相似,checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可通过配置文件中“state.checkpoints.dir”配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。 - Configure globally via configuration files + 通过配置文件全局配置 {% highlight yaml %} state.checkpoints.dir: hdfs:///checkpoints/ {% endhighlight %} - Configure for per job when constructing the state backend + 创建state backend对单个作业进行配置 {% highlight java %} env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); {% endhighlight %} -### Difference to Savepoints +### Checkpoint与Savepoint的区别 -Checkpoints have a few differences from [savepoints](savepoints.html). They -- use a state backend specific (low-level) data format, may be incremental. -- do not support Flink specific features like rescaling. +checkpoint与[savepoints](savepoints.html)有一些区别,体现在他们: +- 使用特定于state backend(低级)的数据格式,可能以增量方式存储。 +- 不支持Flink的特定功能,比如扩缩容。 -### Resuming from a retained checkpoint +### 从checkpoint中恢复状态 -A job may be resumed from a checkpoint just as from a savepoint -by using the checkpoint's meta data file instead (see the -[savepoint restore guide](../cli.html#restore-a-savepoint)). Note that if the -meta data file is not self-contained, the jobmanager needs to have access to -the data files it refers to (see [Directory Structure](#directory-structure) -above). +与savepoint一样,作业可以从checkpoint的元数据文件恢复运行([savepoint恢复指南](../cli.html#restore-a-savepoint))。注意,如果元数据文件中信息不充分,那么jobmanager就需要使用相关的数据文件来恢复作业(参考[目录结构](#directory-structure))。 Review comment: ```suggestion 与 savepoint 一样,作业可以从 checkpoint 的元数据文件恢复运行([savepoint恢复指南](../cli.html#restore-a-savepoint))。注意,如果元数据文件中信息不充分,那么 jobmanager 就需要读取相关的数据文件来恢复作业(参考[目录结构](#directory-structure))。 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13032) Allow Access to Per-Window State in mergeable window ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877507#comment-16877507 ] jasine chen commented on FLINK-13032: - [~yanghua] yes, it's helpful in our scene, we use session window to analysis behavior, and we do need to record ,store and output states of each session window > Allow Access to Per-Window State in mergeable window ProcessWindowFunction > --- > > Key: FLINK-13032 > URL: https://issues.apache.org/jira/browse/FLINK-13032 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: jasine chen >Priority: Major > > access to per-window state is allowed in non-merging windows, but it's > necessary to access per-window state in mergeable windows > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13072) RocksDBStateBachend is not thread safe and data loss silently
lynn1.zhang created FLINK-13072: --- Summary: RocksDBStateBachend is not thread safe and data loss silently Key: FLINK-13072 URL: https://issues.apache.org/jira/browse/FLINK-13072 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.8.1, 1.8.0 Reporter: lynn1.zhang Attachments: flink-demo.zip I create 2 mapstates in one operator, then create 2 threads in apply method, each thread operate each map state(the operator is same), the expect result is that 2 state have the same result but not. I upload the code, please help to try it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload
stevenzwu opened a new pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload URL: https://github.com/apache/flink/pull/8665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload
stevenzwu closed pull request #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload URL: https://github.com/apache/flink/pull/8665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Garlandal closed pull request #8960: fix: ignore infinite metrics value
Garlandal closed pull request #8960: fix: ignore infinite metrics value URL: https://github.com/apache/flink/pull/8960 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Garlandal opened a new pull request #8960: fix: ignore infinite metrics value
Garlandal opened a new pull request #8960: fix: ignore infinite metrics value URL: https://github.com/apache/flink/pull/8960 influxdb 不支持 inf 类型的数据,flink 的 metrics 会产生挺多这样的数据,直接上的话 taskManager 会一直抛异常。 目前已经有 issue 提到了这个事情,https://issues.apache.org/jira/browse/FLINK-12147,不过看上去好像 flink 社区想推 influxdb 做一些改动支持 inf,感觉得等挺久的,这边先做一个 workaround 。 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r299768508 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r299771028 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link
[jira] [Created] (FLINK-13071) QueryOperationConverter in Blink planner support add kinds of QueryOperations.
Jing Zhang created FLINK-13071: -- Summary: QueryOperationConverter in Blink planner support add kinds of QueryOperations. Key: FLINK-13071 URL: https://issues.apache.org/jira/browse/FLINK-13071 Project: Flink Issue Type: Task Components: Table SQL / API Reporter: Jing Zhang Assignee: Jing Zhang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk commented on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-507927294 Forced pushing after rebased onto master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on issue #8951: [FLINK-13062] Set ScheduleMode based on boundedness of streaming Pipeline
XuPingyong commented on issue #8951: [FLINK-13062] Set ScheduleMode based on boundedness of streaming Pipeline URL: https://github.com/apache/flink/pull/8951#issuecomment-507926214 Thanks @aljoscha . I think boundedness of streaming Pipeline is not related to the streaming/batch behavior. A bounded source(eg. Collection source) can run not only batch job, but also streaming job. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r299766343 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.RestartStrategyConfiguration}, unless it's a +* {@link
[GitHub] [flink] flinkbot commented on issue #8959: [FLINK-13070][table-planner-blink] Remove TableImpl and use api.internal.TableImpl in blink
flinkbot commented on issue #8959: [FLINK-13070][table-planner-blink] Remove TableImpl and use api.internal.TableImpl in blink URL: https://github.com/apache/flink/pull/8959#issuecomment-507922656 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13070) Remove TableImpl and use api.internal.TableImpl in blink
[ https://issues.apache.org/jira/browse/FLINK-13070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13070: --- Labels: pull-request-available (was: ) > Remove TableImpl and use api.internal.TableImpl in blink > > > Key: FLINK-13070 > URL: https://issues.apache.org/jira/browse/FLINK-13070 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi opened a new pull request #8959: [FLINK-13070][table-planner-blink] Remove TableImpl and use api.internal.TableImpl in blink
JingsongLi opened a new pull request #8959: [FLINK-13070][table-planner-blink] Remove TableImpl and use api.internal.TableImpl in blink URL: https://github.com/apache/flink/pull/8959 ## What is the purpose of the change Remove TableImpl and use api.internal.TableImpl in blink ## Verifying this change ut ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13070) Remove TableImpl and use api.internal.TableImpl in blink
Jingsong Lee created FLINK-13070: Summary: Remove TableImpl and use api.internal.TableImpl in blink Key: FLINK-13070 URL: https://issues.apache.org/jira/browse/FLINK-13070 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Jingsong Lee Assignee: Jingsong Lee -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8958: [FLINK-13035] [Runtime / Task] make LocalStreamEnvironment launch act…
flinkbot commented on issue #8958: [FLINK-13035] [Runtime / Task] make LocalStreamEnvironment launch act… URL: https://github.com/apache/flink/pull/8958#issuecomment-507921864 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] spafka opened a new pull request #8958: [FLINK-13035] [Runtime / Task] make LocalStreamEnvironment launch act…
spafka opened a new pull request #8958: [FLINK-13035] [Runtime / Task] make LocalStreamEnvironment launch act… URL: https://github.com/apache/flink/pull/8958 …ually solts number. ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *LocalStreamEnvironment may can't luach becase some taks use different soltGroup,this pull may fix it* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13035) LocalStreamEnvironment shall launch actuall task solts
[ https://issues.apache.org/jira/browse/FLINK-13035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13035: --- Labels: pull-request-available (was: ) > LocalStreamEnvironment shall launch actuall task solts > --- > > Key: FLINK-13035 > URL: https://issues.apache.org/jira/browse/FLINK-13035 > Project: Flink > Issue Type: Wish > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Wong >Assignee: Wong >Priority: Trivial > Labels: pull-request-available > > When developing flink jobs, there is some times use different soltgroup to > expand threads.But now minicluster use default > jobGraph.getMaximumParallelism(), sometimes is less than actual solts,so it > can‘’t lanch job if not set TaskManagerOptions.NUM_TASK_SLOTS . Is this > needed? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13069) HiveTableSink should implement OverwritableTableSink
Rui Li created FLINK-13069: -- Summary: HiveTableSink should implement OverwritableTableSink Key: FLINK-13069 URL: https://issues.apache.org/jira/browse/FLINK-13069 Project: Flink Issue Type: Sub-task Reporter: Rui Li Assignee: Rui Li -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13068) HiveTableSink should implement PartitionableTableSink
Rui Li created FLINK-13068: -- Summary: HiveTableSink should implement PartitionableTableSink Key: FLINK-13068 URL: https://issues.apache.org/jira/browse/FLINK-13068 Project: Flink Issue Type: Sub-task Reporter: Rui Li Assignee: Rui Li -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r299761058 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala ## @@ -100,22 +100,21 @@ class StreamPlanner( parsed match { case insert: SqlInsert => -val targetColumnList = insert.getTargetColumnList Review comment: Removed computed columns and add this code snippet back. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r299760631 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/ComputedColumn.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Computed column. + * + * If we got a create table DDL statement: + * + * create table t( + * a int, + * b varchar, + * c as to_timestamp(b)) + * with ( + * connector = 'csv' + * ) + * + * The columned column would expect to be {@code ComputedColumn#of("c", "to_timestamp(b)")}. + */ +@Internal +public class ComputedColumn implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + + private final String expression; + + /** Please use {@link #of(String, String)} instead. */ + private ComputedColumn(String name, String expression) { + this.name = name; + this.expression = expression; + } + + public String getName() { + return name; + } + + public String getExpression() { + return expression; + } + + public String getProjectExpression() { Review comment: removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r299760657 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ## @@ -56,9 +57,28 @@ private final Map fieldNameToIndex; - private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) { + private final String[] primaryKeys; + + private final String[][] uniqueKeys; Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r299760572 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Operation to describe a CREATE TABLE statement. + */ +public class CreateTableOperation implements CreateOperation { + private final String[] tablePath; + private CatalogTable catalogTable; + private boolean ignoreIfExists; + + public CreateTableOperation(String[] tablePath, + CatalogTable catalogTable, + boolean ignoreIfExists) { + this.tablePath = tablePath; + this.catalogTable = catalogTable; + this.ignoreIfExists = ignoreIfExists; + } + + public CatalogTable getCatalogTable() { + return catalogTable; + } + + public String[] getTablePath() { + return tablePath; + } + + public boolean isIgnoreIfExists() { + return ignoreIfExists; + } + + @Override + public String asSummaryString() { Review comment: Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r299760129 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -114,6 +116,24 @@ */ void registerTable(String name, Table table); + /** +* Registers a {@link CatalogTable} under a given object path. The {@code path} could be +* 3 formats: +* +* `catalog.db.table`: A full table path including the catalog name, +* the database name and table name. +* `db.table`: database name following table name, with the current catalog name. +* `table`: Only the table name, with the current catalog name and database name. +* +* The registered tables then can be referenced in Sql queries. +* +* @param path The path under wich the table will be registered +* @param catalogTable The table to register +* @param ignoreIfExists If true, do nothing if there is already same table name under +* the {@code path}. If false, a TableAlreadyExistException throws. +*/ + void registerTable(String[] path, CatalogBaseTable catalogTable, boolean ignoreIfExists); Review comment: Agree and removed, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r299760275 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.operations.Operation; + +/** + * A {@link Operation} that describes the DDL statements, e.g. CREATE TABLE or CREATE FUNCTION. + * + * Different sub operations can have their special instances. For example, a + * create table operation will have a {@link org.apache.flink.table.catalog.CatalogTable} instance, + * while a create function operation will have a + * {@link org.apache.flink.api.common.functions.Function} instance. Review comment: Yep, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page
[ https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877445#comment-16877445 ] yelun edited comment on FLINK-12833 at 7/3/19 3:09 AM: --- Hi,[~jark],do you know what this issue means? thanks was (Author: guanghui): Hi,[~jark],do you know what this isuus means? thanks > Add Klaviyo to Chinese PoweredBy page > - > > Key: FLINK-12833 > URL: https://issues.apache.org/jira/browse/FLINK-12833 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Fabian Hueske >Assignee: yelun >Priority: Major > > Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English > PoweredBy page. > It should be added to the Chinese page as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #8751: [FLINK-11937][StateBackend]Resolve small file problem in RocksDB incremental checkpoint
klion26 commented on issue #8751: [FLINK-11937][StateBackend]Resolve small file problem in RocksDB incremental checkpoint URL: https://github.com/apache/flink/pull/8751#issuecomment-507920049 @StephanEwen thanks for the comments, I'm trying to answer the questions below: - As a high-level description: this change introduces a new state handle(`FsSegmentStateHandle`), modify checkpoint metadata(**Do not modify the layout**, just add a type for `FsSegmentStateHandle` in `SavepointV2Serializer`), add some information in `SharedRegistry` to track the underlying file's reference, and other necessary modifications. - This is a new option needs users to activate - I think there are no compatibility problems, **for the checkpoint meta we don't change the layout**, just add a new type for the new state handle, for restoring from the exist `FileStateHandle`, we'll delivery to `RocksDBStateDownloader#downloadDataForStateHandle` and will read both the `FileStateHandle` and `FsSegmentStateHandle` correctly, for `SharedStateRegistry` all the modifications just affect the newly introduced state handle only. - In my opinion, we can't just do this change in state bankend. I'm trying to give the reasons below: - first, we need to track the position(start position and end position) in the file of currently state handle(because after applying this change, each state handle is mapping to a block of one file) - second, we need to track the reference count of the underlying file, so that we can delete the file in the feature in time(not too early and not too late, delete the file too early will encounter problems of `IOException`, delete too late will consume too may disk space). Please let me know if I need to give more information, sir. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page
[ https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877445#comment-16877445 ] yelun commented on FLINK-12833: --- Hi,[~jark],do you know what this isuus means? thanks > Add Klaviyo to Chinese PoweredBy page > - > > Key: FLINK-12833 > URL: https://issues.apache.org/jira/browse/FLINK-12833 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Fabian Hueske >Assignee: yelun >Priority: Major > > Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English > PoweredBy page. > It should be added to the Chinese page as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zjuwangg commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
zjuwangg commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#discussion_r299760649 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java ## @@ -1069,6 +1070,84 @@ public void testListPartitionPartialSpec() throws Exception { assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size()); } + + // -- table and column stats -- + + @Test + public void testGetTableStats_TableNotExistException() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + exception.expect(org.apache.flink.table.catalog.exceptions.TableNotExistException.class); + catalog.getTableStatistics(path1); + } + + @Test + public void testGetPartitionStats() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createPartitionedTable(), false); + catalog.createPartition(path1, createPartitionSpec(), createPartition(), false); + CatalogTableStatistics tableStatistics = catalog.getPartitionStatistics(path1, createPartitionSpec()); + assertEquals(0, tableStatistics.getFileCount()); Review comment: it will cause an unwanted reverse dependency for HiveStatsUtil is a class of flink-connector-hive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-507919160 Thanks for the patience and addressing my comments @Aitozi ! I think you might miss one previous comment for using tuple2 instead of tuple3. After fixing this I have no other concerns. LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299759623 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + SingleInputGate inputGate1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels).f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() +
[GitHub] [flink] Aitozi commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
Aitozi commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299758981 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + SingleInputGate inputGate1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels).f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r299758806 ## File path: docs/_includes/generated/blob_server_configuration.html ## @@ -7,16 +7,16 @@ - -blob.client.socket.timeout -30 -The socket timeout in milliseconds for the blob client. - blob.client.connect.timeout 0 The connection timeout in milliseconds for the blob client. + Review comment: The changes for config html is automatically generated. I guess it's due to some previous changes did not generate docs properly. I have notified zhijiang about it and he has already fixed it. After rebasing master, this diff would be gone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r299758797 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoaderTest.java ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +/** + * Unit tests for {@link RestartBackoffTimeStrategyFactoryLoader}. + */ +public class RestartBackoffTimeStrategyFactoryLoaderTest extends TestLogger { + + private static final RestartStrategies.RestartStrategyConfiguration DEFAULT_RESTART_STRATEGY_CONFIGURATION = + new RestartStrategies.FallbackRestartStrategyConfiguration(); + + @Test + public void testNewStrategySpecified() throws Exception { + // specify RestartBackoffTimeStrategy directly in cluster config + final Configuration conf = new Configuration(); + conf.setString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME, + TestRestartBackoffTimeStrategy.class.getName()); + + // the RestartStrategyConfiguration should not take effect as the loader will + // directly create the factory from the config of the new version strategy + final RestartBackoffTimeStrategy.Factory factory = + RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory( + new RestartStrategies.FailureRateRestartStrategyConfiguration( + 1, + Time.milliseconds(1000), + Time.milliseconds(1000)), + conf, + true); + + assertThat( + factory, + instanceOf(TestRestartBackoffTimeStrategy.TestRestartBackoffTimeStrategyFactory.class)); + } + + @Test + public void testInvalidNewStrategySpecified() throws Exception { + final Configuration conf = new Configuration(); + conf.setString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME, + InvalidTestRestartBackoffTimeStrategy.class.getName()); + + final RestartBackoffTimeStrategy.Factory factory = + RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory( + DEFAULT_RESTART_STRATEGY_CONFIGURATION, + conf, + true); + + assertThat( + factory, + instanceOf(NoRestartBackoffTimeStrategy.NoRestartBackoffTimeStrategyFactory.class)); + } + + @Test + public void testNoStrategySpecifiedWhenCheckpointingEnabled() throws Exception { + final RestartBackoffTimeStrategy.Factory factory = + RestartBackoffTimeStrategyFactoryLoader.createRestartStrategyFactory( + DEFAULT_RESTART_STRATEGY_CONFIGURATION, + new Configuration(), + true); + + assertThat( + factory, + instanceOf(FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory.class)); + } + + @Test + public void
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299758674 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + SingleInputGate inputGate1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels).f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() +
[GitHub] [flink] guoweiM commented on issue #8951: [FLINK-13062] Set ScheduleMode based on boundedness of streaming Pipeline
guoweiM commented on issue #8951: [FLINK-13062] Set ScheduleMode based on boundedness of streaming Pipeline URL: https://github.com/apache/flink/pull/8951#issuecomment-507917477 Thanks @aljoscha . I think it is ok for BlinkPlanner to choose a proper schedule mode in 1.9. I have a little question about how planner set the configuration of runtime. Sometime a planner might need directly configure the engine to schedule the Job in a specific way. For example there are bounded sources but the planner want use the third scheduling strategy. My point is we might need a common way to let the planner configure how the engine is execute in the future. What do you think ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13067) Fix broken links to contributing docs
[ https://issues.apache.org/jira/browse/FLINK-13067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-13067: - Fix Version/s: 1.9.0 > Fix broken links to contributing docs > - > > Key: FLINK-13067 > URL: https://issues.apache.org/jira/browse/FLINK-13067 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.9.0 > > > As contributing links change on [https://github.com/apache/flink-web], all > links to contributing related docs have become broken. We need to fix these > broken links. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13067) Fix broken links to contributing docs
Yun Tang created FLINK-13067: Summary: Fix broken links to contributing docs Key: FLINK-13067 URL: https://issues.apache.org/jira/browse/FLINK-13067 Project: Flink Issue Type: Bug Components: Documentation Reporter: Yun Tang Assignee: Yun Tang As contributing links change on [https://github.com/apache/flink-web], all links to contributing related docs have become broken. We need to fix these broken links. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk commented on a change in pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#discussion_r299756509 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategyFactoryLoader.java ## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartBackoffTimeStrategyOptions; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A utility class to load {@link RestartBackoffTimeStrategy.Factory} from the configuration. + */ +public class RestartBackoffTimeStrategyFactoryLoader { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBackoffTimeStrategyFactoryLoader.class); + + private static final String CREATE_METHOD = "createFactory"; + + /** +* Creates proper {@link RestartBackoffTimeStrategy.Factory}. +* If new version restart strategy is specified, will directly use it. +* Otherwise will decide based on legacy restart strategy configs. +* +* @param jobRestartStrategyConfiguration restart configuration given within the job graph +* @param clusterConfiguration cluster(server-side) configuration, usually represented as jobmanager config +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return new version restart strategy factory +*/ + public static RestartBackoffTimeStrategy.Factory createRestartStrategyFactory( + final RestartStrategies.RestartStrategyConfiguration jobRestartStrategyConfiguration, + final Configuration clusterConfiguration, + final boolean isCheckpointingEnabled) throws Exception { + + checkNotNull(jobRestartStrategyConfiguration); + checkNotNull(clusterConfiguration); + + final String restartStrategyClassName = clusterConfiguration.getString( + RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_CLASS_NAME); + + if (restartStrategyClassName != null) { + // create new restart strategy directly if it is specified in cluster config + return createRestartStrategyFactoryInternal(clusterConfiguration); + } else { + // adapt the legacy restart strategy configs as new restart strategy configs + final Configuration adaptedConfiguration = getAdaptedConfiguration( + jobRestartStrategyConfiguration, + clusterConfiguration, + isCheckpointingEnabled); + + // create new restart strategy from the adapted config + return createRestartStrategyFactoryInternal(adaptedConfiguration); + } + } + + /** +* Decides the {@link RestartBackoffTimeStrategy} to use and its params based on legacy configs, +* and records its class name and the params into a adapted configuration. +* +* The decision making is as follows: +* +* Use strategy of {@link RestartStrategies.RestartStrategyConfiguration}, unless it's a +* {@link
[GitHub] [flink] zhijiangW commented on issue #7197: [FLINK-12909][flink-runtime][network] continue trying to find a suitable spilling channel even in case of exceptions
zhijiangW commented on issue #7197: [FLINK-12909][flink-runtime][network] continue trying to find a suitable spilling channel even in case of exceptions URL: https://github.com/apache/flink/pull/7197#issuecomment-507915036 Thanks for the further reviews @NicoK ! Sorry for missing your previous comment of unit test. I also think it is better to add a test for covering this change. We might construct the `SpillingAdaptiveSpanningRecordDeserializer` with some invalid `tmpDirectories`. Then we could verify the attempt retry work in `createSpillingChannel`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8957: [hotfix][docs] fix broken links
flinkbot commented on issue #8957: [hotfix][docs] fix broken links URL: https://github.com/apache/flink/pull/8957#issuecomment-507914700 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on issue #8957: [hotfix][docs] fix broken links
sjwiesman commented on issue #8957: [hotfix][docs] fix broken links URL: https://github.com/apache/flink/pull/8957#issuecomment-507914461 cc @knaufk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman opened a new pull request #8957: [hotfix][docs] fix broken links
sjwiesman opened a new pull request #8957: [hotfix][docs] fix broken links URL: https://github.com/apache/flink/pull/8957 ## What is the purpose of the change Update broken links in the documentation introduced by FLINK-12639 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r299754527 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class contains scheduling logic for EAGER and LAZY_FROM_SOURCES. + * It is used for normal scheduling and legacy failover strategy re-scheduling. + */ +public class SchedulingUtils { + + /** +* Schedule vertices lazy. That means only vertices satisfying its input constraint will be scheduled. +* +* @param vertices Topologically sorted vertices to schedule. +* @param executionGraph The graph the given vertices belongs to. +*/ + public static CompletableFuture scheduleLazy( + final Iterable vertices, + final ExecutionGraph executionGraph) { + + executionGraph.assertRunningInJobMasterMainThread(); + + final Set previousAllocations = computePriorAllocationIdsIfRequiredByScheduling( + vertices, executionGraph.getSlotProvider()); + + final ArrayList> schedulingFutures = new ArrayList<>(); + for (ExecutionVertex executionVertex : vertices) { + // only schedule vertex when its input constraint is satisfied + if (executionVertex.getJobVertex().getJobVertex().isInputVertex() || + executionVertex.checkInputDependencyConstraints()) { Review comment: current `executionVertex.checkInputDependencyConstraints` does not proper deal with the input vertex case. If the constraint is ANY and the vertex has no input, it will return false. Maybe we can fix the `checkInputDependencyConstraints` and simplify the check here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page
[ https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yelun reassigned FLINK-12833: - Assignee: yelun > Add Klaviyo to Chinese PoweredBy page > - > > Key: FLINK-12833 > URL: https://issues.apache.org/jira/browse/FLINK-12833 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Fabian Hueske >Assignee: yelun >Priority: Major > > Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English > PoweredBy page. > It should be added to the Chinese page as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r299754065 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.SchedulingUtils; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + failGlobal(cause); + return; + } + + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; + } + + final ExecutionVertexID vertexID =
[jira] [Commented] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page
[ https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877431#comment-16877431 ] yelun commented on FLINK-12833: --- [~fhueske] hi,could you explain the "Klaviyo" in details? and I did not find the commit id b54ecfa930653bcfecd60df3414deca5291c6cb3 in master branch. thanks. > Add Klaviyo to Chinese PoweredBy page > - > > Key: FLINK-12833 > URL: https://issues.apache.org/jira/browse/FLINK-12833 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Fabian Hueske >Priority: Major > > Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English > PoweredBy page. > It should be added to the Chinese page as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xpray commented on issue #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
Xpray commented on issue #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#issuecomment-507911734 @StephanEwen , Thanks for the review, I'll try to catch up with the lasted changes on shuffle service with @zhijiangW first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12943) Translate "HDFS Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12943. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 345ac8868b705b7c9be9bb70e6fb54d1d15baa9b > Translate "HDFS Connector" page into Chinese > > > Key: FLINK-12943 > URL: https://issues.apache.org/jira/browse/FLINK-12943 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html; > into Chinese. > > The doc located in "flink/docs/dev/connectors/filesystem_sink.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12944) Translate "Streaming File Sink" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12944. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 29a0e8e5e7d69b0450ac3865c25df0e5d75d758b > Translate "Streaming File Sink" page into Chinese > - > > Key: FLINK-12944 > URL: https://issues.apache.org/jira/browse/FLINK-12944 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/streamfile_sink.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8838: [FLINK-12946][docs]Translate Apache NiFi Connector page into Chinese
asfgit closed pull request #8838: [FLINK-12946][docs]Translate Apache NiFi Connector page into Chinese URL: https://github.com/apache/flink/pull/8838 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8918: [FLINK-12944][docs]Translate Streaming File Sink page into Chinese
asfgit closed pull request #8918: [FLINK-12944][docs]Translate Streaming File Sink page into Chinese URL: https://github.com/apache/flink/pull/8918 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8843: [FLINK-12945][docs] Translate RabbitMQ Connector page into Chinese
asfgit closed pull request #8843: [FLINK-12945][docs] Translate RabbitMQ Connector page into Chinese URL: https://github.com/apache/flink/pull/8843 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8897: [FLINK-12943][docs]Translate HDFS Connector page into Chinese
asfgit closed pull request #8897: [FLINK-12943][docs]Translate HDFS Connector page into Chinese URL: https://github.com/apache/flink/pull/8897 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8837: [FLINK-12938][docs]Translate "Streaming Connectors" page into Chinese
asfgit closed pull request #8837: [FLINK-12938][docs]Translate "Streaming Connectors" page into Chinese URL: https://github.com/apache/flink/pull/8837 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12946) Translate "Apache NiFi Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12946. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 5caed717570e14e0d45c49eccf409c3170034af1 > Translate "Apache NiFi Connector" page into Chinese > --- > > Key: FLINK-12946 > URL: https://issues.apache.org/jira/browse/FLINK-12946 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/nifi.htmll; > into Chinese. > The doc located in "flink/docs/dev/connectors/nifi.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12938) Translate "Streaming Connectors" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12938. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: bda485fc077453ff0a555c3f25e702bfd0a1f339 > Translate "Streaming Connectors" page into Chinese > -- > > Key: FLINK-12938 > URL: https://issues.apache.org/jira/browse/FLINK-12938 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Translate the page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/; into > Chinese. > The doc located in "flink/docs/dev/connectors/index.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12945) Translate "RabbitMQ Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12945. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 6bf207166a5ca09c259d854989ccff18687745bc > Translate "RabbitMQ Connector" page into Chinese > > > Key: FLINK-12945 > URL: https://issues.apache.org/jira/browse/FLINK-12945 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/rabbitmq.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/rabbitmq.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #8895: [FLINK-12536][Runtime/Network]Make BufferOrEventSequence#getNext non blocking
klion26 commented on issue #8895: [FLINK-12536][Runtime/Network]Make BufferOrEventSequence#getNext non blocking URL: https://github.com/apache/flink/pull/8895#issuecomment-507907989 @StephanEwen thanks for the comments, I'll continue after the discussion has been solved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11638) Translate "Savepoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877415#comment-16877415 ] Congxian Qiu(klion26) commented on FLINK-11638: --- [~guanghui] I'm not working on it, but there is some one have a pr[1] for it, and I think the pr is amost done. [1] [https://github.com/apache/flink/pull/8300] > Translate "Savepoints" page into Chinese > > > Key: FLINK-11638 > URL: https://issues.apache.org/jira/browse/FLINK-11638 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: yelun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > doc locates in flink/docs/ops/state/savepoints.zh.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11638) Translate "Savepoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877413#comment-16877413 ] yelun commented on FLINK-11638: --- [~klion26] hi,do you still working on this issue? > Translate "Savepoints" page into Chinese > > > Key: FLINK-11638 > URL: https://issues.apache.org/jira/browse/FLINK-11638 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: yelun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > doc locates in flink/docs/ops/state/savepoints.zh.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu commented on issue #8956: [FLINK-12991][python] Correct the implementation of Catalog.get_table_factory
dianfu commented on issue #8956: [FLINK-12991][python] Correct the implementation of Catalog.get_table_factory URL: https://github.com/apache/flink/pull/8956#issuecomment-507906136 @bowenli86 Could you help to take a look at if this change makes sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8956: [FLINK-12991][python] Correct the implementation of Catalog.get_table_factory
flinkbot commented on issue #8956: [FLINK-12991][python] Correct the implementation of Catalog.get_table_factory URL: https://github.com/apache/flink/pull/8956#issuecomment-507905839 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu opened a new pull request #8956: [FLINK-12991][python] Correct the implementation of Catalog.get_table_factory
dianfu opened a new pull request #8956: [FLINK-12991][python] Correct the implementation of Catalog.get_table_factory URL: https://github.com/apache/flink/pull/8956 ## What is the purpose of the change *Currently the implementation of Catalog.get_table_factory returns a Java TableFactory and this is not friendly for Python users. After taking a look at the implementation and it seems that this method will only be used internally at Java side. So I think it's not necessary to add it to the Python API. This PR fixes this issue by removing this method from the Python Catalog.* ## Brief change log - *Removes Catalog.get_table_factory* ## Verifying this change This change is a trivial rework without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12991) Correct the implementation of Catalog.get_table_factory
[ https://issues.apache.org/jira/browse/FLINK-12991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12991: --- Labels: pull-request-available (was: ) > Correct the implementation of Catalog.get_table_factory > --- > > Key: FLINK-12991 > URL: https://issues.apache.org/jira/browse/FLINK-12991 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Dian Fu >Priority: Major > Labels: pull-request-available > > The following method is added in catalog.py in FLINK-11480: > {code} > def get_table_factory(self): > """ > Get an optional TableFactory instance that's responsible for generating > source/sink for tables stored in this catalog. > > :return: An optional TableFactory instance. > """ > return self._j_catalog.getTableFactory() > {code} > There is some problem with the implementation as it returns a Java > TableFactory and this is not friendly for Python users. We should correct the > implementation. > Before doing that, we need to make sure the following thing: > Is this method designed to be used by users or will only be used internally? > I take a quick look at the code and it seems to me that this method will only > be used internally. Maybe [~xuefuz] and [~phoenixjiangnan] can provide more > information about this. > If this method is designed to be used by users directly, we need to provide a > Python TableFactory wrapper and makes sure this method is usable for Python > users. If this method is designed to be only used internally, then we need to > remove it from the Python catalog. For the API completeness test, we can add > this method to *excluded_methods* of *CatalogAPICompletenessTests* to make > the tests passed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] TengHu commented on issue #8885: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.
TengHu commented on issue #8885: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload. URL: https://github.com/apache/flink/pull/8885#issuecomment-507905048 @rmetzger @aljoscha Hey guys, Although there is already a huge amount of open PR:) I'd really appreciate it if you guys can take a look at this, let me know if need more info or stats. Cheers, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8947: [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini
asfgit closed pull request #8947: [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini URL: https://github.com/apache/flink/pull/8947 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8947: [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini
flinkbot edited a comment on issue #8947: [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini URL: https://github.com/apache/flink/pull/8947#issuecomment-507664999 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8955: [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir
flinkbot commented on issue #8955: [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir URL: https://github.com/apache/flink/pull/8955#issuecomment-507884466 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13066) append hive-site.xml to path defined in 'hive-conf-dir'
[ https://issues.apache.org/jira/browse/FLINK-13066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13066: --- Labels: pull-request-available (was: ) > append hive-site.xml to path defined in 'hive-conf-dir' > --- > > Key: FLINK-13066 > URL: https://issues.apache.org/jira/browse/FLINK-13066 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 opened a new pull request #8955: [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir
bowenli86 opened a new pull request #8955: [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir URL: https://github.com/apache/flink/pull/8955 ## What is the purpose of the change This PR fixes a bug that we previously only pass Hive conf dir to `HiveConf` but we really should pass path of hive-site.xml. Thus, the change is to append `hive-site.xml` to the Hive conf dir and pass into `HiveConf` if Hive conf dir is not null. ## Brief change log - append `hive-site.xml` to the Hive conf dir and pass into `HiveConf` if Hive conf dir is not null ## Verifying this change manual testing and verificaiton ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13066) append hive-site.xml to path defined in 'hive-conf-dir'
Bowen Li created FLINK-13066: Summary: append hive-site.xml to path defined in 'hive-conf-dir' Key: FLINK-13066 URL: https://issues.apache.org/jira/browse/FLINK-13066 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8954: [FLINK-13065][doc] Corrected example snippet using KeySelector
flinkbot commented on issue #8954: [FLINK-13065][doc] Corrected example snippet using KeySelector URL: https://github.com/apache/flink/pull/8954#issuecomment-507873435 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13065) Document example snippet correction using KeySelector
[ https://issues.apache.org/jira/browse/FLINK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13065: --- Labels: correction doc, example pull-request-available (was: correction doc, example) > Document example snippet correction using KeySelector > - > > Key: FLINK-13065 > URL: https://issues.apache.org/jira/browse/FLINK-13065 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: correction, doc,, example, pull-request-available > > The broadcast state > [example|[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#provided-apis]] > states: > > {noformat} > Starting from the stream of Items, we just need to key it by Color, as we > want pairs of the same color. This will make sure that elements of the same > color end up on the same physical machine. > // key the shapes by color > KeyedStream colorPartitionedStream = shapeStream > .keyBy(new KeySelector Color>(){...});{noformat} > > However, it uses shape stream and KeySelector but should use > KeySelector to create KeyedStream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] mans2singh opened a new pull request #8954: [FLINK-13065][doc] Corrected example snippet using KeySelector
mans2singh opened a new pull request #8954: [FLINK-13065][doc] Corrected example snippet using KeySelector URL: https://github.com/apache/flink/pull/8954 ## What is the purpose of the change * Corrected example snippet * ## Brief change log * Corrected usage of KeySelector ## Verifying this change This change is a trivial update to documentation ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13065) Document example snippet correction using KeySelector
[ https://issues.apache.org/jira/browse/FLINK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mans Singh updated FLINK-13065: --- Description: The broadcast state [example|[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#provided-apis]] states: {noformat} Starting from the stream of Items, we just need to key it by Color, as we want pairs of the same color. This will make sure that elements of the same color end up on the same physical machine. // key the shapes by color KeyedStream colorPartitionedStream = shapeStream .keyBy(new KeySelector(){...});{noformat} However, it uses shape stream and KeySelector but should use KeySelector to create KeyedStream. was: The broadcast state [example|[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#provided-apis]] states: {noformat} Starting from the stream of Items, we just need to key it by Color, as we want pairs of the same color. This will make sure that elements of the same color end up on the same physical machine. // key the shapes by color KeyedStream colorPartitionedStream = shapeStream .keyBy(new KeySelector(){...});{noformat} How it uses shape stream and use KeySelector but should use KeySelector to create KeyedStream. > Document example snippet correction using KeySelector > - > > Key: FLINK-13065 > URL: https://issues.apache.org/jira/browse/FLINK-13065 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: correction, doc,, example > > The broadcast state > [example|[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#provided-apis]] > states: > > {noformat} > Starting from the stream of Items, we just need to key it by Color, as we > want pairs of the same color. This will make sure that elements of the same > color end up on the same physical machine. > // key the shapes by color > KeyedStream colorPartitionedStream = shapeStream > .keyBy(new KeySelector Color>(){...});{noformat} > > However, it uses shape stream and KeySelector but should use > KeySelector to create KeyedStream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13065) Document example snippet correction using KeySelector
Mans Singh created FLINK-13065: -- Summary: Document example snippet correction using KeySelector Key: FLINK-13065 URL: https://issues.apache.org/jira/browse/FLINK-13065 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Mans Singh Assignee: Mans Singh The broadcast state [example|[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/broadcast_state.html#provided-apis]] states: {noformat} Starting from the stream of Items, we just need to key it by Color, as we want pairs of the same color. This will make sure that elements of the same color end up on the same physical machine. // key the shapes by color KeyedStream colorPartitionedStream = shapeStream .keyBy(new KeySelector(){...});{noformat} How it uses shape stream and use KeySelector but should use KeySelector to create KeyedStream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#discussion_r299700273 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ## @@ -158,6 +159,14 @@ */ Map>> getAccumulatorsSerialized(); + /** +* Returns a {@link BlockingPersistentResultPartitionMeta}. +* @return BlockingPersistentResultPartitionMeta contains ResultPartition locations +*/ + default BlockingPersistentResultPartitionMeta getBlockingPersistentResultPartitionMeta() { Review comment: I think this is a tricky use of a default methods, because this is not a working implementation for all subclasses, which is how default methods on interfaces should only be used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#discussion_r299699717 ## File path: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ## @@ -118,6 +119,9 @@ private final ExecutionConfig config = new ExecutionConfig(); + private final BlockingPersistentResultPartitionMeta blockingPersistentResultPartitionMeta = Review comment: This should probably not be in a separate field, but in the `lastJobExecutionResult`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#discussion_r299701049 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -814,6 +820,69 @@ public Executor getFutureExecutor() { entry -> serializeAccumulator(entry.getKey(), entry.getValue(; } + @Override + public BlockingPersistentResultPartitionMeta getBlockingPersistentResultPartitionMeta() { + Map> resultPartitionDescriptors = new HashMap<>(); + + // keep record of all failed IntermediateDataSetID + Set failedIntermediateDataSetIds = new HashSet<>(); + + for (ExecutionVertex executionVertex : getAllExecutionVertices()) { + for (IntermediateResultPartition intermediateResultPartition : executionVertex.getProducedPartitions().values()) { + if (intermediateResultPartition.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT) { + try { + addLocation(resultPartitionDescriptors, intermediateResultPartition); + } catch (Throwable throwable) { + LOG.error("Failed to get location of ResultPartition: " + intermediateResultPartition.getPartitionId(), throwable); + failedIntermediateDataSetIds.add( + new AbstractID(intermediateResultPartition.getIntermediateResult().getId())); + } + } + } + } + + return new BlockingPersistentResultPartitionMeta(resultPartitionDescriptors, failedIntermediateDataSetIds); + } + + /** +* +* @param resultPartitionDescriptors +* @param intermediateResultPartition +* throw exception if any error occurs. +*/ + public void addLocation( + Map> resultPartitionDescriptors, + IntermediateResultPartition intermediateResultPartition) { + + IntermediateDataSetID dataSetID = intermediateResultPartition.getIntermediateResult().getId(); + + Map map = resultPartitionDescriptors.computeIfAbsent( + new AbstractID(dataSetID), key -> new HashMap<>() + ); + + TaskManagerLocation taskManagerLocation = null; + + // The taskManagerLocation should be ready already since the previous job is done. + try { + taskManagerLocation = intermediateResultPartition + .getProducer().getCurrentExecutionAttempt().getTaskManagerLocationFuture().get(1, TimeUnit.SECONDS); Review comment: This is a blocking waiting call, which cannot be used in a non blocking data structure like the execution graph. The call to the future needs to complete or fail instantly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#discussion_r299701522 ## File path: flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java ## @@ -66,6 +74,49 @@ public void mapPartition(Iterable values, Collector out) throw assertEquals(PARALLELISM, resultCollection.size()); } + @Test + public void testAccessingBlockingPersistentResultPartition() throws Exception { Review comment: I am not sure if this test is well placed in the ExecutionEnvironmentITCase. It does not test the ExecutionEnvironment, it tests a different feature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
StephanEwen commented on a change in pull request #8756: [FLINK-12406] [Runtime] Report BLOCKING_PERSISTENT result partition meta back to client URL: https://github.com/apache/flink/pull/8756#discussion_r299700721 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -814,6 +820,69 @@ public Executor getFutureExecutor() { entry -> serializeAccumulator(entry.getKey(), entry.getValue(; } + @Override + public BlockingPersistentResultPartitionMeta getBlockingPersistentResultPartitionMeta() { + Map> resultPartitionDescriptors = new HashMap<>(); + + // keep record of all failed IntermediateDataSetID + Set failedIntermediateDataSetIds = new HashSet<>(); + + for (ExecutionVertex executionVertex : getAllExecutionVertices()) { + for (IntermediateResultPartition intermediateResultPartition : executionVertex.getProducedPartitions().values()) { + if (intermediateResultPartition.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT) { + try { + addLocation(resultPartitionDescriptors, intermediateResultPartition); + } catch (Throwable throwable) { + LOG.error("Failed to get location of ResultPartition: " + intermediateResultPartition.getPartitionId(), throwable); + failedIntermediateDataSetIds.add( + new AbstractID(intermediateResultPartition.getIntermediateResult().getId())); + } + } + } + } + + return new BlockingPersistentResultPartitionMeta(resultPartitionDescriptors, failedIntermediateDataSetIds); + } + + /** +* +* @param resultPartitionDescriptors +* @param intermediateResultPartition +* throw exception if any error occurs. +*/ + public void addLocation( Review comment: This utility method should not be in the ExecutionGraph - it just blows up the public signature. It should be private static or moved to a utility class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services