(flink) branch release-1.17 updated: [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 1419619b071 [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files 1419619b071 is described below commit 1419619b07122b5c734105311d8eff939a687596 Author: Feng Jiajie AuthorDate: Mon Jan 29 10:39:27 2024 +0800 [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files --- .../flink/runtime/state/TaskExecutorLocalStateStoresManager.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index b4dcd6af729..d56601ee5d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task executor (manager). @@ -296,9 +297,11 @@ public class TaskExecutorLocalStateStoresManager { @Nonnull static Collection listAllocationDirectoriesIn(File localStateRootDirectory) throws IOException { -return Files.list(localStateRootDirectory.toPath()) -.filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)) -.collect(Collectors.toList()); +try (Stream fileListStream = Files.list(localStateRootDirectory.toPath())) { +return fileListStream +.filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)) +.collect(Collectors.toList()); +} } public void shutdown() {
(flink) branch master updated: [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 360c1a0831b [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files 360c1a0831b is described below commit 360c1a0831b64bfb79699d81325c8f9783517be1 Author: Feng Jiajie AuthorDate: Mon Jan 29 10:39:27 2024 +0800 [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local state files --- .../flink/runtime/state/TaskExecutorLocalStateStoresManager.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index 04b0c136fbf..b58b7fbde93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task executor (manager). @@ -293,9 +294,11 @@ public class TaskExecutorLocalStateStoresManager { @Nonnull static Collection listAllocationDirectoriesIn(File localStateRootDirectory) throws IOException { -return Files.list(localStateRootDirectory.toPath()) -.filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)) -.collect(Collectors.toList()); +try (Stream fileListStream = Files.list(localStateRootDirectory.toPath())) { +return fileListStream +.filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)) +.collect(Collectors.toList()); +} } public void shutdown() {
(flink) branch master updated: [FLINK-34353][table-planner] Fix unclear exception without setting minibatch size when enable minibatch optimization
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new de2d175decc [FLINK-34353][table-planner] Fix unclear exception without setting minibatch size when enable minibatch optimization de2d175decc is described below commit de2d175decce8defeb7931f449392e47d637e2c4 Author: xuyang AuthorDate: Sun Feb 4 18:17:33 2024 +0800 [FLINK-34353][table-planner] Fix unclear exception without setting minibatch size when enable minibatch optimization This closes #24264 --- .../table/planner/plan/utils/MinibatchUtil.java| 7 +- .../planner/plan/stream/sql/agg/AggregateTest.xml | 21 + .../plan/stream/sql/agg/AggregateTest.scala| 27 +- .../planner/plan/stream/sql/join/JoinTest.scala| 25 4 files changed, 69 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java index d6677a18107..991fac72181 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java @@ -68,6 +68,11 @@ public class MinibatchUtil { * @return mini batch size */ public static long miniBatchSize(ReadableConfig config) { -return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE); +long size = config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE); +if (size <= 0) { +throw new IllegalArgumentException( +ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE + " must be > 0."); +} +return size; } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml index 55f72f321ad..4a36ab45186 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml @@ -139,18 +139,21 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$1, COUNT(distinc - - - - - - - -
(flink) 02/03: [FLINK-34247][doc] Update the usage of "env.java.home" in doc.
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 04dd91f2b6c830b9ac0e445f72938e3d6f479edd Author: JunRuiLee AuthorDate: Sun Feb 4 16:40:35 2024 +0800 [FLINK-34247][doc] Update the usage of "env.java.home" in doc. --- .../docs/deployment/resource-providers/standalone/overview.md | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md b/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md index 229d028c24c..fd658ba5c4d 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md @@ -61,7 +61,7 @@ Flink 运行在所有*类 UNIX 环境*下,例如 **Linux**,**Mac OS X** 和 Flink 需要 master 和所有 worker 节点设置 `JAVA_HOME` 环境变量,并指向你的 Java 安装目录。 -你可以在 `conf/flink-conf.yaml` 文件中通过 `env.java.home` 配置项来设置此变量。 +你可以在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-configuration-file" >}})中通过 `env.java.home` 配置项来设置此变量。 需要注意的是,该配置项必须以 flattened 的格式(即一行键值对格式)在配置文件中。 {{< top >}} @@ -82,11 +82,11 @@ cd flink-* ### 配置 Flink -在解压完文件后,你需要编辑 *conf/flink-conf.yaml* 文件来为集群配置 Flink。 +在解压完文件后,你需要编辑 [*Flink 配置文件*]({{< ref "docs/deployment/config#flink-configuration-file" >}})来为集群配置 Flink。 设置 `jobmanager.rpc.address` 配置项指向 master 节点。你也应该通过设置 `jobmanager.memory.process.size` 和 `taskmanager.memory.process.size` 配置项来定义 Flink 允许在每个节点上分配的最大内存值。 -这些值的单位是 MB。如果一些 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 *conf/flink-conf.yaml* 文件中重写 `taskmanager.memory.process.size` 或 `taskmanager.memory.flink.size` 的默认值。 +这些值的单位是 MB。如果一些 worker 节点上有你想分配到 Flink 系统的多余内存,你可以在这些特定节点的 [*Flink 配置文件*]({{< ref "docs/deployment/config#flink-configuration-file" >}}) 文件中重写 `taskmanager.memory.process.size` 或 `taskmanager.memory.flink.size` 的默认值。 最后,你必须提供集群上会被用作为 worker 节点的所有节点列表,也就是运行 TaskManager 的节点。编辑文件 *conf/workers* 并输入每个 worker 节点的 IP 或主机名。 @@ -99,7 +99,7 @@ cd flink-* - /path/to/flink/conf/flink-conf.yaml + /path/to/flink/conf/config.yaml jobmanager.rpc.address: 10.0.0.1 @@ -192,7 +192,7 @@ By default, the job manager will pick a *random port* for inter process communic ### Example: Standalone Cluster with 2 JobManagers -1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: +1. **Configure high availability mode and ZooKeeper quorum** in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): high-availability.type: zookeeper
(flink) 03/03: [FLINK-34247][doc] Update the usage of flink-conf.yaml in doc.
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e9bea09510e18c6143e6e14ca17a894abfaf92bf Author: JunRuiLee AuthorDate: Sun Feb 4 16:47:52 2024 +0800 [FLINK-34247][doc] Update the usage of flink-conf.yaml in doc. This closes #24251. --- .../content.zh/docs/connectors/table/filesystem.md | 4 +-- .../docs/deployment/advanced/historyserver.md | 2 +- docs/content.zh/docs/deployment/cli.md | 6 ++-- docs/content.zh/docs/deployment/config.md | 2 +- .../docs/deployment/filesystems/azure.md | 6 ++-- docs/content.zh/docs/deployment/filesystems/gcs.md | 6 ++-- docs/content.zh/docs/deployment/filesystems/oss.md | 6 ++-- docs/content.zh/docs/deployment/filesystems/s3.md | 12 +++ .../content.zh/docs/deployment/ha/kubernetes_ha.md | 2 +- docs/content.zh/docs/deployment/ha/zookeeper_ha.md | 4 +-- .../docs/deployment/memory/mem_migration.md| 8 ++--- .../content.zh/docs/deployment/metric_reporters.md | 2 +- .../resource-providers/native_kubernetes.md| 4 +-- .../resource-providers/standalone/docker.md| 14 .../resource-providers/standalone/kubernetes.md| 32 +- .../docs/deployment/resource-providers/yarn.md | 2 +- .../docs/deployment/security/security-ssl.md | 2 +- docs/content.zh/docs/deployment/trace_reporters.md | 2 +- .../docs/dev/datastream/execution/parallel.md | 2 +- .../datastream/fault-tolerance/checkpointing.md| 2 +- .../datastream/fault-tolerance/state_backends.md | 2 +- .../docs/dev/python/environment_variables.md | 2 +- docs/content.zh/docs/dev/table/catalogs.md | 4 +-- .../dev/table/hive-compatibility/hiveserver2.md| 2 +- .../docs/dev/table/sql-gateway/overview.md | 4 +-- docs/content.zh/docs/dev/table/sqlClient.md| 2 +- docs/content.zh/docs/ops/debugging/flame_graphs.md | 2 +- docs/content.zh/docs/ops/metrics.md| 4 +-- docs/content.zh/docs/ops/rest_api.md | 2 +- docs/content.zh/docs/ops/state/state_backends.md | 22 ++--- .../docs/ops/state/task_failure_recovery.md| 8 ++--- .../docs/try-flink/flink-operations-playground.md | 4 +-- docs/content/docs/connectors/table/filesystem.md | 4 +-- .../docs/deployment/advanced/historyserver.md | 2 +- docs/content/docs/deployment/cli.md| 6 ++-- docs/content/docs/deployment/config.md | 2 +- docs/content/docs/deployment/filesystems/azure.md | 6 ++-- docs/content/docs/deployment/filesystems/gcs.md| 6 ++-- docs/content/docs/deployment/filesystems/oss.md| 6 ++-- docs/content/docs/deployment/filesystems/s3.md | 12 +++ docs/content/docs/deployment/ha/kubernetes_ha.md | 2 +- docs/content/docs/deployment/ha/zookeeper_ha.md| 4 +-- .../docs/deployment/memory/mem_migration.md| 12 +++ docs/content/docs/deployment/metric_reporters.md | 2 +- .../resource-providers/native_kubernetes.md| 4 +-- .../resource-providers/standalone/docker.md| 14 .../resource-providers/standalone/kubernetes.md| 38 +++--- .../resource-providers/standalone/overview.md | 4 +-- .../docs/deployment/resource-providers/yarn.md | 2 +- .../docs/deployment/security/security-ssl.md | 2 +- docs/content/docs/deployment/trace_reporters.md| 2 +- .../docs/dev/datastream/execution/parallel.md | 2 +- .../datastream/fault-tolerance/checkpointing.md| 2 +- .../datastream/fault-tolerance/state_backends.md | 2 +- .../docs/dev/python/environment_variables.md | 2 +- docs/content/docs/dev/table/catalogs.md| 4 +-- docs/content/docs/dev/table/config.md | 4 +-- .../docs/dev/table/sql-gateway/hiveserver2.md | 2 +- .../content/docs/dev/table/sql-gateway/overview.md | 4 +-- docs/content/docs/dev/table/sqlClient.md | 2 +- docs/content/docs/ops/debugging/flame_graphs.md| 2 +- docs/content/docs/ops/metrics.md | 4 +-- docs/content/docs/ops/rest_api.md | 2 +- docs/content/docs/ops/state/savepoints.md | 2 +- docs/content/docs/ops/state/state_backends.md | 24 +++--- .../docs/ops/state/task_failure_recovery.md| 8 ++--- .../docs/try-flink/flink-operations-playground.md | 2 +- docs/content/docs/try-flink/local_installation.md | 2 +- 68 files changed, 189 insertions(+), 189 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/filesystem.md b/docs/content.zh/docs/connectors/table/filesystem.md index b28a41d9467..17d05045a20 100644 --- a/docs/content.zh/docs/connectors/table/filesystem.md +++ b/docs/content.zh/docs/connectors/table/filesystem.md @@ -241,8 +241,8 @@ CREATE TABLE MyUserTableWithFilepath ( **注意:** 对于 bulk
(flink) 01/03: [FLINK-34247][doc] Add documentation of new Flink configuration file config.yaml.
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5b61baadd02ccdfa702834e2e63aeb8d1d9e1250 Author: JunRuiLee AuthorDate: Sun Feb 4 16:38:55 2024 +0800 [FLINK-34247][doc] Add documentation of new Flink configuration file config.yaml. --- docs/content.zh/docs/deployment/config.md | 88 ++- docs/content/docs/deployment/config.md | 88 ++- docs/layouts/shortcodes/config_file.html| 126 docs/layouts/shortcodes/config_file_zh.html | 126 4 files changed, 424 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 35b85240d03..d3119d4cf8f 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -27,16 +27,100 @@ under the License. # 配置参数 -All configuration is done in `conf/flink-conf.yaml`, which is expected to be a flat collection of [YAML key value pairs](http://www.yaml.org/spec/1.2/spec.html) with format `key: value`. +All configuration can be set in Flink configuration file in the `conf/` directory (see [Flink Configuration File section](#flink-configuration-file)). The configuration is parsed and evaluated when the Flink processes are started. Changes to the configuration file require restarting the relevant processes. -The out of the box configuration will use your default Java installation. You can manually set the environment variable `JAVA_HOME` or the configuration key `env.java.home` in `conf/flink-conf.yaml` if you want to manually override the Java runtime to use. +The out of the box configuration will use your default Java installation. You can manually set the environment variable `JAVA_HOME` or the configuration key `env.java.home` in Flink configuration file if you want to manually override the Java runtime to use. You can specify a different configuration directory location by defining the `FLINK_CONF_DIR` environment variable. For resource providers which provide non-session deployments, you can specify per-job configurations this way. Make a copy of the `conf` directory from the Flink distribution and modify the settings on a per-job basis. Note that this is not supported in Docker or standalone Kubernetes deployments. On Docker-based deployments, you can use the `FLINK_PROPERTIES` environment v [...] On session clusters, the provided configuration will only be used for configuring [execution](#execution) parameters, e.g. configuration parameters affecting the job, not the underlying cluster. +# Flink 配置文件 + +自 Flink-1.19 版本起,Flink 正式引入了对标准 YAML 1.2 语法的完整支持。与之前版本中仅支持简单键值对的配置方式相比,这一更新为用户提供了更加灵活和强大的配置能力。为了利用这一新特性,用户需使用新引入的配置文件 `config.yaml`。原有的 `flink-conf.yaml` 配置文件不再推荐使用,并在即将到来的 Flink-2.0 版本中不再支持。为了确保平滑迁移,建议用户尽早将现有作业配置迁移到新的配置方式。 + +本节将帮助用户理解如何通过 `config.yaml` 配置文件对 Flink 集群和作业进行配置,以及如何将老配置迁移至新的配置文件中。 + +### 用法 + +从 Flink-1.19 版本开始,默认的配置文件已更改为 `config.yaml`,并置于 `conf/` 目录下。用户在进行配置时应直接修改此文件。 + +如果用户希望继续使用 Flink-1.19 之前的配置文件 `flink-conf.yaml`,只需将该文件拷贝到 `conf/` 目录下。一旦检测到 `flink-conf.yaml` 文件,Flink 会优先使用其作为配置文件。 + +`config.yaml` 的配置方式如下: + + Config Key + +- 用户可以按照 Nested 的格式来组织 Config Key,如: + +```config.yaml +restart-strategy: + type: failure-rate + failure-rate: +delay: 1 s +failure-rate-interval: 1 min +max-failures-per-interval: 1 +``` + +- 此外,用户也可以按 Flatten 的格式组织 Config Key,如: + +```flink-conf.yaml +restart-strategy.type: failure-rate +restart-strategy.failure-rate.delay: 1 s +restart-strategy.failure-rate.failure-rate-interval: 1 min +restart-strategy.failure-rate.max-failures-per-interval: 1 +``` + + Config Value + +`config.yaml` 配置文件支持用户按 [YAML 1.2 core schema](https://yaml.org/spec/1.2.2/#103-core-schema) 进行 value 的配置。 + +用户可按如下格式配置 Value 对应的Config Type: + +{{< config_file_zh >}} + +此外,用户还可以按字符串格式配置所有 Config Type 的 Value,只需将原始值使用单引号或双引号括起来。 + +### 从 flink-conf.yaml 迁移至 config.yaml + 行为变更 +`config.yaml` 严格遵循 YAML1.2 语法,与 `flink-conf.yaml` 在大部分情况下都完全兼容,除了以下场景发生了行为变更: + +- Null value: +- `flink-conf.yaml`:仅支持留空。 +- `config.yaml`:支持留空, null, Null, NULL 和 `~` 。 + +- 注释: +- `flink-conf.yaml`:每一行中首次出现 `#` 以后的都认为是注释。 +- `config.yaml`:`#` 号和它前面的内容之间至少有一个空格或者 `#` 号位于一行的开头时,后续内容才认为是注释。 + +- 需要转义的特殊字符: +- `flink-conf.yaml`:仅需要对 List 和 Map的元素进行转义 + - List 中的元素如果含有分号 ";" ,则需要进行转义。 + - Map 中的元素如果含有逗号 "," 和冒号 ":" ,则需要进行转义。 +- `config.yaml`:需要对 YAML 1.2 规范中的特殊字符进行转义,特殊字符的定义见[链接](https://yaml.org/spec/1.2.2/#53-indicator-characters)。 + +- 重复Key: +- `flink-conf.yaml`:允许重复Key,取文件中最末端出现的对应Key的 key-value pair。 +- `config.yaml`:不允许重复Key,加载配置时将报错。 + +- 对非法配置的处理: +- `flink-conf.yaml`:非法的 key-value pair 将被忽略。 +- `config.yaml`:加载配置时将报错。 + + Migration Tool +为了方便用户迁移,Flink 提供了一个配置文件迁移脚本,使用这个脚本可以自动化地完成迁移过程。使用方法如下: + +- 将旧的配置文件
(flink) branch master updated (6be30b16799 -> e9bea09510e)
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 6be30b16799 [FLINK-34323][table-planner] Fix named params in session window tvf new 5b61baadd02 [FLINK-34247][doc] Add documentation of new Flink configuration file config.yaml. new 04dd91f2b6c [FLINK-34247][doc] Update the usage of "env.java.home" in doc. new e9bea09510e [FLINK-34247][doc] Update the usage of flink-conf.yaml in doc. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../content.zh/docs/connectors/table/filesystem.md | 4 +- .../docs/deployment/advanced/historyserver.md | 2 +- docs/content.zh/docs/deployment/cli.md | 6 +- docs/content.zh/docs/deployment/config.md | 88 +- .../docs/deployment/filesystems/azure.md | 6 +- docs/content.zh/docs/deployment/filesystems/gcs.md | 6 +- docs/content.zh/docs/deployment/filesystems/oss.md | 6 +- docs/content.zh/docs/deployment/filesystems/s3.md | 12 +- .../content.zh/docs/deployment/ha/kubernetes_ha.md | 2 +- docs/content.zh/docs/deployment/ha/zookeeper_ha.md | 4 +- .../docs/deployment/memory/mem_migration.md| 8 +- .../content.zh/docs/deployment/metric_reporters.md | 2 +- .../resource-providers/native_kubernetes.md| 4 +- .../resource-providers/standalone/docker.md| 14 +-- .../resource-providers/standalone/kubernetes.md| 32 +++--- .../resource-providers/standalone/overview.md | 10 +- .../docs/deployment/resource-providers/yarn.md | 2 +- .../docs/deployment/security/security-ssl.md | 2 +- docs/content.zh/docs/deployment/trace_reporters.md | 2 +- .../docs/dev/datastream/execution/parallel.md | 2 +- .../datastream/fault-tolerance/checkpointing.md| 2 +- .../datastream/fault-tolerance/state_backends.md | 2 +- .../docs/dev/python/environment_variables.md | 2 +- docs/content.zh/docs/dev/table/catalogs.md | 4 +- .../dev/table/hive-compatibility/hiveserver2.md| 2 +- .../docs/dev/table/sql-gateway/overview.md | 4 +- docs/content.zh/docs/dev/table/sqlClient.md| 2 +- docs/content.zh/docs/ops/debugging/flame_graphs.md | 2 +- docs/content.zh/docs/ops/metrics.md| 4 +- docs/content.zh/docs/ops/rest_api.md | 2 +- docs/content.zh/docs/ops/state/state_backends.md | 22 ++-- .../docs/ops/state/task_failure_recovery.md| 8 +- .../docs/try-flink/flink-operations-playground.md | 4 +- docs/content/docs/connectors/table/filesystem.md | 4 +- .../docs/deployment/advanced/historyserver.md | 2 +- docs/content/docs/deployment/cli.md| 6 +- docs/content/docs/deployment/config.md | 88 +- docs/content/docs/deployment/filesystems/azure.md | 6 +- docs/content/docs/deployment/filesystems/gcs.md| 6 +- docs/content/docs/deployment/filesystems/oss.md| 6 +- docs/content/docs/deployment/filesystems/s3.md | 12 +- docs/content/docs/deployment/ha/kubernetes_ha.md | 2 +- docs/content/docs/deployment/ha/zookeeper_ha.md| 4 +- .../docs/deployment/memory/mem_migration.md| 12 +- docs/content/docs/deployment/metric_reporters.md | 2 +- .../resource-providers/native_kubernetes.md| 4 +- .../resource-providers/standalone/docker.md| 14 +-- .../resource-providers/standalone/kubernetes.md| 38 +++ .../resource-providers/standalone/overview.md | 4 +- .../docs/deployment/resource-providers/yarn.md | 2 +- .../docs/deployment/security/security-ssl.md | 2 +- docs/content/docs/deployment/trace_reporters.md| 2 +- .../docs/dev/datastream/execution/parallel.md | 2 +- .../datastream/fault-tolerance/checkpointing.md| 2 +- .../datastream/fault-tolerance/state_backends.md | 2 +- .../docs/dev/python/environment_variables.md | 2 +- docs/content/docs/dev/table/catalogs.md| 4 +- docs/content/docs/dev/table/config.md | 4 +- .../docs/dev/table/sql-gateway/hiveserver2.md | 2 +- .../content/docs/dev/table/sql-gateway/overview.md | 4 +- docs/content/docs/dev/table/sqlClient.md | 2 +- docs/content/docs/ops/debugging/flame_graphs.md| 2 +- docs/content/docs/ops/metrics.md | 4 +- docs/content/docs/ops/rest_api.md | 2 +- docs/content/docs/ops/state/savepoints.md | 2 +- docs/content/docs/ops/state/state_backends.md | 24 ++-- .../docs/ops/state/task_failure_recovery.md| 8 +- .../docs/try-flink/flink-operations-playground.md | 2 +-
(flink) branch master updated: [FLINK-34323][table-planner] Fix named params in session window tvf
This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6be30b16799 [FLINK-34323][table-planner] Fix named params in session window tvf 6be30b16799 is described below commit 6be30b167990c22765c244a703ab0424e7c3b4d9 Author: Xuyang AuthorDate: Mon Feb 5 10:06:03 2024 +0800 [FLINK-34323][table-planner] Fix named params in session window tvf This close #24243 --- .../functions/sql/SqlSessionTableFunction.java | 2 +- .../functions/sql/SqlWindowTableFunction.java | 3 + .../plan/stream/sql/WindowTableFunctionTest.xml| 154 ++--- .../plan/stream/sql/WindowTableFunctionTest.scala | 65 + 4 files changed, 204 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java index 454d1af6898..895dbeefe1a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java @@ -68,7 +68,7 @@ public class SqlSessionTableFunction extends SqlWindowTableFunction { /** Operand type checker for SESSION. */ private static class OperandMetadataImpl extends AbstractOperandMetadata { OperandMetadataImpl() { -super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE), 3); +super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, GAP), 3); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java index 0e7879a6dcc..3f22bed1907 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java @@ -63,6 +63,9 @@ public class SqlWindowTableFunction extends org.apache.calcite.sql.SqlWindowTabl /** The slide interval, only used for HOP window. */ protected static final String PARAM_STEP = "STEP"; +/** The gap interval, only used for SESSION window. */ +protected static final String GAP = "GAP"; + /** * Type-inference strategy whereby the row type of a table function call is a ROW, which is * combined from the row type of operand #0 (which is a TABLE) and two additional fields. The diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml index fabcb77e269..bf45b5f6de5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml @@ -16,23 +16,18 @@ See the License for the specific language governing permissions and limitations under the License. --> - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala index ff64550e35d..a364882fb93 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala @@ -257,4 +257,69 @@ class WindowTableFunctionTest extends TableTestBase { util.verifyRelPlan(sql) } + @Test + def testSessionTVF(): Unit = { +val sql = + """ +|SELECT * +|FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) +|""".stripMargin +util.verifyRelPlan(sql) +
(flink) branch master updated: [FLINK-34229][table-runtime] Set CodeGeneratorContext of outer class as ancestor context when generate inner classes to avoid naming conflicts
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a603f2bb853 [FLINK-34229][table-runtime] Set CodeGeneratorContext of outer class as ancestor context when generate inner classes to avoid naming conflicts a603f2bb853 is described below commit a603f2bb8536fa75453694a77c66a76f2e33b941 Author: zoudan AuthorDate: Tue Jan 30 19:18:21 2024 +0800 [FLINK-34229][table-runtime] Set CodeGeneratorContext of outer class as ancestor context when generate inner classes to avoid naming conflicts This closes #24228 --- .../flink/table/planner/codegen/CodeGenUtils.scala | 6 ++--- .../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 ++- .../codegen/sort/ComparatorCodeGenerator.scala | 31 +- .../planner/codegen/sort/SortCodeGenerator.scala | 17 +--- .../batch/sql/OperatorFusionCodegenITCase.scala| 13 + 5 files changed, 62 insertions(+), 8 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala index 52be23ed123..312bd0e0b58 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala @@ -332,7 +332,7 @@ object CodeGenUtils { s"Unsupported type($t) to generate hash code," + s" the type($t) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.") case ARRAY => -val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader) +val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader, ctx) val genHash = HashCodeGenerator.generateArrayHash( subCtx, @@ -340,7 +340,7 @@ object CodeGenUtils { "SubHashArray") genHashFunction(ctx, subCtx, genHash, term) case MULTISET | MAP => -val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader) +val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader, ctx) val (keyType, valueType) = t match { case multiset: MultisetType => (multiset.getElementType, new IntType()) @@ -353,7 +353,7 @@ object CodeGenUtils { case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)" case ROW | STRUCTURED_TYPE => val fieldCount = getFieldCount(t) -val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader) +val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader, ctx) val genHash = HashCodeGenerator.generateRowHash(subCtx, t, "SubHashRow", (0 until fieldCount).toArray) genHashFunction(ctx, subCtx, genHash, term) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala index 35e0613bddc..8453522d45c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala @@ -901,7 +901,8 @@ object HashAggCodeGenHelper { ctx.tableConfig, ctx.classLoader, aggMapKeyType, - SortUtil.getAscendingSortSpec(Array.range(0, aggMapKeyType.getFieldCount))) + SortUtil.getAscendingSortSpec(Array.range(0, aggMapKeyType.getFieldCount)), + ctx) val computer = sortCodeGenerator.generateNormalizedKeyComputer("AggMapKeyComputer") val comparator = sortCodeGenerator.generateRecordComparator("AggMapValueComparator") diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala index a14b7376b08..d6bda014ce3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala @@ -51,9 +51,38 @@ object ComparatorCodeGenerator { name: String, inputType: RowType, sortSpec: SortSpec): GeneratedRecordComparator = { +gen(tableConfig, classLoader, name, inputType, sortSpec, null) + } + + /** + * Generates a [[RecordComparator]] that can be passed to a
(flink) branch master updated: [FLINK-34313][doc][table] Add document for session window tvf
This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new eff073ff119 [FLINK-34313][doc][table] Add document for session window tvf eff073ff119 is described below commit eff073ff1199acf0f26a0f04ede7d692837301c3 Author: Xuyang AuthorDate: Sun Feb 4 23:24:54 2024 +0800 [FLINK-34313][doc][table] Add document for session window tvf This closes #24250 --- .../docs/dev/table/sql/queries/window-agg.md | 124 +++- .../docs/dev/table/sql/queries/window-join.md | 4 + .../docs/dev/table/sql/queries/window-topn.md | 4 + .../docs/dev/table/sql/queries/window-tvf.md | 208 + docs/content.zh/docs/dev/table/tuning.md | 2 +- .../docs/dev/table/sql/queries/window-agg.md | 124 +++- .../docs/dev/table/sql/queries/window-join.md | 4 + .../docs/dev/table/sql/queries/window-topn.md | 4 + .../docs/dev/table/sql/queries/window-tvf.md | 203 docs/content/docs/dev/table/tuning.md | 2 +- 10 files changed, 504 insertions(+), 175 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/window-agg.md b/docs/content.zh/docs/dev/table/sql/queries/window-agg.md index f9395745202..ced9016bf41 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/window-agg.md +++ b/docs/content.zh/docs/dev/table/sql/queries/window-agg.md @@ -40,11 +40,15 @@ GROUP BY window_start, window_end, ... ### 窗口表值函数 -Flink 支持在 `TUMBLE`, `HOP` 和 `CUMULATE` 上进行窗口聚合。 +Flink 支持在 `TUMBLE`,`HOP`,`CUMULATE` 和 `SESSION` 上进行窗口聚合。 在流模式下,窗口表值函数的时间属性字段必须是 [事件时间或处理时间]({{< ref "docs/dev/table/concepts/time_attributes" >}})。关于窗口函数更多信息,参见 [Windowing TVF]({{< ref "docs/dev/table/sql/queries/window-tvf" >}})。 在批模式下,窗口表值函数的时间属性字段必须是 `TIMESTAMP` 或 `TIMESTAMP_LTZ` 类型的。 -这里有关于 `TUMBLE`,`HOP` 和 `CUMULATE` 窗口聚合的几个例子: +{{< hint info >}} +注意:`SESSION` 窗口聚合目前不支持批模式。 +{{< /hint >}} + +这里有关于 `TUMBLE`,`HOP`,`CUMULATE` 和 `SESSION` 窗口聚合的几个例子: ```sql -- tables must have time attribute, e.g. `bidtime` in this table @@ -71,48 +75,74 @@ Flink SQL> SELECT * FROM Bid; +--+---+--+-+ -- tumbling window aggregation -Flink SQL> SELECT window_start, window_end, SUM(price) +Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; -+--+--+---+ -| window_start | window_end | price | -+--+--+---+ -| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | -| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | -+--+--+---+ ++--+--+-+ +| window_start | window_end | total_price | ++--+--+-+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | ++--+--+-+ -- hopping window aggregation -Flink SQL> SELECT window_start, window_end, SUM(price) +Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; -+--+--+---+ -| window_start | window_end | price | -+--+--+---+ -| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | -| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | -| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | -| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | -+--+--+---+ ++--+--+-+ +| window_start | window_end | total_price | ++--+--+-+ +| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | +| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | +| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +| 2020-04-15 08:15 | 2020-04-15 08:25 |6.00 | ++--+--+-+ -- cumulative window aggregation -Flink SQL> SELECT window_start, window_end, SUM(price) +Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; -+--+--+---+ -| window_start | window_end | price | -+--+--+---+ -| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | -| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | -| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | -| 2020-04-15 08:10
(flink) branch master updated (60795b78dce -> 433b025b057)
This is an automated email from the ASF dual-hosted git repository. jchan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 60795b78dce [FLINK-34217][docs] Update user doc for type serialization with FLIP-398 add 433b025b057 [FLINK-34034][docs] Update the query hint docs to clarify the resolution of conflicts in kv hint and list hint No new revisions were added by this update. Summary of changes: .../content.zh/docs/dev/table/sql/queries/hints.md | 34 docs/content/docs/dev/table/sql/queries/hints.md | 37 ++ 2 files changed, 71 insertions(+)
(flink) branch master updated (5fe6f2024f7 -> 60795b78dce)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 5fe6f2024f7 [FLINK-26369][doc-zh] Translate the part zh-page mixed with not be translated. add 60795b78dce [FLINK-34217][docs] Update user doc for type serialization with FLIP-398 No new revisions were added by this update. Summary of changes: .../serialization/third_party_serializers.md | 40 - .../serialization/types_serialization.md | 98 +++--- 2 files changed, 87 insertions(+), 51 deletions(-)