[flink-web] 01/02: Add Apache Flink release 1.9.3
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit af2d8de132b73f0dae6fd88912fe1c92202f3efe Author: Dian Fu AuthorDate: Fri Apr 17 20:29:14 2020 +0800 Add Apache Flink release 1.9.3 --- _config.yml| 56 --- _posts/2020-04-24-release-1.9.3.md | 136 + 2 files changed, 166 insertions(+), 26 deletions(-) diff --git a/_config.yml b/_config.yml index b28fa08..876e449 100644 --- a/_config.yml +++ b/_config.yml @@ -103,48 +103,48 @@ flink_releases: - version_short: 1.9 binary_release: - name: "Apache Flink 1.9.2" + name: "Apache Flink 1.9.3" scala_211: - id: "192-download_211" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz; - asc_url: "https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz.asc; - sha512_url: "https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz.sha512; + id: "193-download_211" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.11.tgz; + asc_url: "https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.11.tgz.asc; + sha512_url: "https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.11.tgz.sha512; scala_212: - id: "192-download_212" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.12.tgz; - asc_url: "https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.12.tgz.asc; - sha512_url: "https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.12.tgz.sha512; + id: "193-download_212" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz; + asc_url: "https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz.asc; + sha512_url: "https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz.sha512; source_release: - name: "Apache Flink 1.9.2" - id: "192-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.9.2/flink-1.9.2-src.tgz; - asc_url: "https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-src.tgz.asc; - sha512_url: "https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-src.tgz.sha512; + name: "Apache Flink 1.9.3" + id: "193-download-source" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.9.3/flink-1.9.3-src.tgz; + asc_url: "https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-src.tgz.asc; + sha512_url: "https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-src.tgz.sha512; optional_components: - name: "Avro SQL Format" category: "SQL Formats" scala_dependent: false - id: 192-sql-format-avro - url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.2/flink-avro-1.9.2.jar - asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.2/flink-avro-1.9.2.jar.asc - sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.2/flink-avro-1.9.2.jar.sha1 + id: 193-sql-format-avro + url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.3/flink-avro-1.9.3.jar + asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.3/flink-avro-1.9.3.jar.asc + sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.3/flink-avro-1.9.3.jar.sha1 - name: "CSV SQL Format" category: "SQL Formats" scala_dependent: false - id: 192-sql-format-csv - url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.2/flink-csv-1.9.2.jar - asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.2/flink-csv-1.9.2.jar.asc - sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.2/flink-csv-1.9.2.jar.sha1 + id: 193-sql-format-csv + url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.3/flink-csv-1.9.3.jar + asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.3/flink-csv-1.9.3.jar.asc + sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.3/flink-csv-1.9.3.jar.sha1 - name: "JSON SQL Format" category: "SQL Formats" scala_dependent: false - id: 192-sql-format-json - url:
[flink-web] branch asf-site updated (66d0403 -> 155914a)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 66d0403 Update status for Hequn to PMC in community web page new af2d8de Add Apache Flink release 1.9.3 new 155914a Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: _config.yml| 56 +- _posts/2020-04-24-release-1.9.3.md | 136 content/blog/feed.xml | 779 - content/blog/index.html| 40 +- content/blog/page10/index.html | 48 +- content/blog/page11/index.html | 29 + content/blog/page2/index.html | 40 +- content/blog/page3/index.html | 40 +- content/blog/page4/index.html | 40 +- content/blog/page5/index.html | 40 +- content/blog/page6/index.html | 38 +- content/blog/page7/index.html | 36 +- content/blog/page8/index.html | 36 +- content/blog/page9/index.html | 42 +- content/downloads.html | 29 +- content/index.html | 8 +- .../04/24/release-1.9.3.html} | 106 +-- content/zh/downloads.html | 33 +- content/zh/index.html | 8 +- 19 files changed, 689 insertions(+), 895 deletions(-) create mode 100644 _posts/2020-04-24-release-1.9.3.md copy content/news/{2018/08/21/release-1.5.3.html => 2020/04/24/release-1.9.3.html} (64%)
[flink] branch master updated: [FLINK-17333][doc] add doc for 'create catalog' ddl
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f6f2fb5 [FLINK-17333][doc] add doc for 'create catalog' ddl f6f2fb5 is described below commit f6f2fb56c3ee1d801f0db62fed357f918d40d4d0 Author: bowen.li AuthorDate: Wed Apr 22 22:05:54 2020 -0700 [FLINK-17333][doc] add doc for 'create catalog' ddl closes #11871 --- docs/dev/table/sql/create.md| 18 ++ docs/dev/table/sql/create.zh.md | 18 ++ 2 files changed, 36 insertions(+) diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md index 6e41eac..e44e2bc 100644 --- a/docs/dev/table/sql/create.md +++ b/docs/dev/table/sql/create.md @@ -210,6 +210,24 @@ The key and value of expression `key1=val1` should both be string literal. See d {% top %} +## CREATE CATALOG + +{% highlight sql %} +CREATE CATALOG catalog_name + WITH (key1=val1, key2=val2, ...) +{% endhighlight %} + +Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown. + +**WITH OPTIONS** + +Catalog properties used to store extra information related to this catalog. +The key and value of expression `key1=val1` should both be string literal. + +Check out more details at [Catalogs]({{ site.baseurl }}/dev/table/catalogs.html). + +{% top %} + ## CREATE DATABASE {% highlight sql %} diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md index a696912..5c6099a 100644 --- a/docs/dev/table/sql/create.zh.md +++ b/docs/dev/table/sql/create.zh.md @@ -210,6 +210,24 @@ CREATE TABLE Orders ( {% top %} +## CREATE CATALOG + +{% highlight sql %} +CREATE CATALOG catalog_name + WITH (key1=val1, key2=val2, ...) +{% endhighlight %} + +Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown. + +**WITH OPTIONS** + +Catalog properties used to store extra information related to this catalog. +The key and value of expression `key1=val1` should both be string literal. + +Check out more details at [Catalogs]({{ site.baseurl }}/dev/table/catalogs.html). + +{% top %} + ## CREATE DATABASE {% highlight sql %}
[flink] branch master updated: [FLINK-16812][jdbc] support array types in PostgresRowConverter
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4121292 [FLINK-16812][jdbc] support array types in PostgresRowConverter 4121292 is described below commit 4121292fbb63dacef29245a2234da68fa499efa6 Author: bowen.li AuthorDate: Wed Apr 15 22:26:22 2020 -0700 [FLINK-16812][jdbc] support array types in PostgresRowConverter closes #11766 --- .../row/converter/AbstractJDBCRowConverter.java| 5 +- .../source/row/converter/JDBCRowConverter.java | 2 +- .../source/row/converter/PostgresRowConverter.java | 43 +++ .../io/jdbc/catalog/PostgresCatalogITCase.java | 26 + .../java/io/jdbc/catalog/PostgresCatalogTest.java | 9 +++- .../io/jdbc/catalog/PostgresCatalogTestBase.java | 62 ++ 6 files changed, 144 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java index 06a6329..abe753f 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java @@ -54,7 +54,10 @@ public abstract class AbstractJDBCRowConverter implements JDBCRowConverter { return reuse; } - private JDBCFieldConverter createConverter(LogicalType type) { + /** +* Create a runtime JDBC field converter from given {@link LogicalType}. +*/ + public JDBCFieldConverter createConverter(LogicalType type) { LogicalTypeRoot root = type.getTypeRoot(); if (root == LogicalTypeRoot.SMALLINT) { diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java index 3f997b4..e89fbc3 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java @@ -43,6 +43,6 @@ public interface JDBCRowConverter extends Serializable { */ @FunctionalInterface interface JDBCFieldConverter extends Serializable { - Object convert(Object value); + Object convert(Object value) throws SQLException; } } diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java index 102f079..82b6be8 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java @@ -18,7 +18,15 @@ package org.apache.flink.api.java.io.jdbc.source.row.converter; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import org.postgresql.jdbc.PgArray; +import org.postgresql.util.PGobject; /** * Row converter for Postgres. @@ -28,4 +36,39 @@ public class PostgresRowConverter extends AbstractJDBCRowConverter { public PostgresRowConverter(RowType rowType) { super(rowType); } + + @Override + public JDBCFieldConverter createConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) type; + + // PG's bytea[] is wrapped in PGobject, rather than primitive byte arrays + if (LogicalTypeChecks.hasFamily(arrayType.getElementType(), LogicalTypeFamily.BINARY_STRING)) { + + return v -> { + PgArray pgArray = (PgArray) v; + Object[] in = (Object[]) pgArray.getArray(); + + Object[] out = new
[flink] branch master updated (c4b44e9 -> 6128bd1)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c4b44e9 [FLINK-13639] Refactor the IntermediateResultPartitionID to consist of IntermediateDataSetID and partitionIndex add 6128bd1 [FLINK-17175][core] StringUtils.arrayToString() should consider Object[] lastly No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/util/StringUtils.java | 6 +++--- .../src/test/java/org/apache/flink/util/StringUtilsTest.java | 11 +++ 2 files changed, 14 insertions(+), 3 deletions(-)
[flink] annotated tag release-1.10.1-rc1 updated (84b74cc -> ee391fd)
This is an automated email from the ASF dual-hosted git repository. liyu pushed a change to annotated tag release-1.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/flink.git. *** WARNING: tag release-1.10.1-rc1 was modified! *** from 84b74cc (commit) to ee391fd (tag) tagging 84b74cc0e21981bf6feceb74b48d7a9d3e215dc5 (commit) replaces pre-apache-rename by Yu Li on Fri Apr 24 15:57:51 2020 +0800 - Log - release-1.10.1-rc1 -BEGIN PGP SIGNATURE- Version: GnuPG v2.0.22 (GNU/Linux) iQIcBAABAgAGBQJeopv/AAoJEJPAeQJ3GrdDacgP/0GlJzrzRLIf3n4oVTr7ZpCl m769HJCepjWoo5IlRl7PI5f7xdtRsB+145kOOvMAoru4X7GZzCLLoY6ZxDEIEqv6 xt/1SW78LxqqnfZdHBoMLM4IFzW1JWHT4q2sUjM2GfDly2UPbgQKtOul7bCzzytN bEo9KwOGcsKmsT3RyVZ4ybJ5TGZ+aQkTSjPbwKbzUXhj62OlCCeHDvpovo6z70zW EN3ZSNe/cRqDBpFOSQ72cCL1frNOt6g0Y4ip4ivEZSN9LG1HWCa5p9c0MsU1wUzW hYLm+hjFT0AzMhiD1Yxe6oHzE/RlNs7WedCWwHs6SHPamMwRfhnLKnj2Ua0NJXmK rCWYpPgfp0qfm7ti5hmUa75UxGzCbCnV0CD8UYsXVP5LBAK40khhP3QM2aQeBSzR JHGykcNfKjqmuloZV06QjfljrRsARpMm0aEb7WzLMQdxpjp1EYUuFBKGaF0QAiZx fAF0JYvsEjKu6r1t4sDlDUd6iOZsfIYAkrTRB3qkkg09Bk7+RuwjRM+53jTF1TwY lJmhR6PPrhGmdFBrMRp+0PLNXUofClqFcqYI/6zbfoI5zcUd5Ix0cIVRMa2cqJ7D IpD+VxxeqyuTgFWD/AW3WcTcCNRDJkXMQcR6Am2fODfQxTgUs0L7+ftS7lGPjMYb cP8nDLVJWZQAtnHmpC9j =N9Qu -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r39106 - /dev/flink/flink-1.10.1-rc1/
Author: liyu Date: Fri Apr 24 16:32:13 2020 New Revision: 39106 Log: Add flink-1.10.1-rc1 Added: dev/flink/flink-1.10.1-rc1/ dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz (with props) dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.asc (with props) dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512 dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz (with props) dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.asc (with props) dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512 dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz (with props) dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.asc (with props) dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512 dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz (with props) dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz.asc (with props) dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz.sha512 Added: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz -- svn:mime-type = application/x-gzip Added: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.asc == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512 == --- dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512 (added) +++ dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512 Fri Apr 24 16:32:13 2020 @@ -0,0 +1 @@ +2aa83b6cf9888bae6215e376379ab4bd3f9370b708e1bb4b6e526f4099116777b66c8aaad77986a02560999dbe93d0f3079c5903b365322fba085eb76e4afb98 apache-flink-1.10.1.tar.gz Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz -- svn:mime-type = application/x-gzip Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.asc == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.asc -- svn:mime-type = application/pgp-signature Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512 == --- dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512 (added) +++ dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512 Fri Apr 24 16:32:13 2020 @@ -0,0 +1 @@ +6fcbc3a9e3aa86e21c6a63ac66bb2522fb826e305ff72b77172fde70d7c5a000a2a94fe0cdd3d8b1d7fb2d2367eec0a8105cbfddc86c48aceabbfeece151c92a flink-1.10.1-bin-scala_2.11.tgz Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz -- svn:mime-type = application/x-gzip Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.asc == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.asc -- svn:mime-type = application/pgp-signature Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512 == --- dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512 (added) +++ dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512 Fri Apr 24 16:32:13 2020 @@ -0,0 +1 @@ +b71a0acff9c228c49df3fff0e011b8ee59e7f54e3436b43e0f0db7582676c2e6bc5dd11a8860e4b02125d4e8cd11fee64120022e1a961315ebd910f66d7543b1 flink-1.10.1-bin-scala_2.12.tgz Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz == Binary file - no diff available. Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz
[flink] branch master updated: [FLINK-13639] Refactor the IntermediateResultPartitionID to consist of IntermediateDataSetID and partitionIndex
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c4b44e9 [FLINK-13639] Refactor the IntermediateResultPartitionID to consist of IntermediateDataSetID and partitionIndex c4b44e9 is described below commit c4b44e9d4a7af9e841f785fcf0e4831f4cd5afe9 Author: Yangze Guo AuthorDate: Wed Apr 22 14:52:21 2020 +0800 [FLINK-13639] Refactor the IntermediateResultPartitionID to consist of IntermediateDataSetID and partitionIndex --- .../IntermediateResultPartition.java | 2 +- .../runtime/io/network/netty/NettyMessage.java | 4 +- .../runtime/jobgraph/IntermediateDataSetID.java| 17 +++ .../jobgraph/IntermediateResultPartitionID.java| 54 +- .../apache/flink/runtime/topology/ResultID.java| 2 +- 5 files changed, 64 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 5cdd376b..78bd9c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -46,7 +46,7 @@ public class IntermediateResultPartition { this.producer = producer; this.partitionNumber = partitionNumber; this.consumers = new ArrayList>(0); - this.partitionId = new IntermediateResultPartitionID(); + this.partitionId = new IntermediateResultPartitionID(totalResult.getId(), partitionNumber); } public ExecutionVertex getProducer() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index 2ed875d..5523cf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -508,7 +508,7 @@ public abstract class NettyMessage { ByteBuf result = null; try { - result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16 + 4); + result = allocateBuffer(allocator, ID, 20 + 16 + 4 + 16 + 4); partitionId.getPartitionId().writeTo(result); partitionId.getProducerId().writeTo(result); @@ -569,7 +569,7 @@ public abstract class NettyMessage { // TODO Directly serialize to Netty's buffer ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event); - result = allocateBuffer(allocator, ID, 4 + serializedEvent.remaining() + 16 + 16 + 16); + result = allocateBuffer(allocator, ID, 4 + serializedEvent.remaining() + 20 + 16 + 16); result.writeInt(serializedEvent.remaining()); result.writeBytes(serializedEvent); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java index c22ca1e..b34e3f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.runtime.topology.ResultID; import org.apache.flink.util.AbstractID; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + import java.util.UUID; /** @@ -54,4 +56,19 @@ public class IntermediateDataSetID extends AbstractID implements ResultID { public IntermediateDataSetID(UUID from) { super(from.getLeastSignificantBits(), from.getMostSignificantBits()); } + + private IntermediateDataSetID(long lower, long upper) { + super(lower, upper); + } + + public void writeTo(ByteBuf buf) { + buf.writeLong(lowerPart); + buf.writeLong(upperPart); + } + + public static IntermediateDataSetID fromByteBuf(ByteBuf buf) { + final long lower = buf.readLong(); + final long upper = buf.readLong(); + return new IntermediateDataSetID(lower, upper); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionID.java
[flink] branch master updated (90fed72 -> deb1268)
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 90fed72 [FLINK-17345][python][table] Support register and get Python UDF in Catalog (#11884) add deb1268 [FLINK-17021][table-planner-blink] Blink batch planner set GlobalDataExchangeMode No new revisions were added by this update. Summary of changes: .../generated/execution_config_configuration.html | 6 +- .../apache/flink/table/tpcds/TpcdsTestProgram.java | 3 - .../table/api/config/ExecutionConfigOptions.java | 39 -- .../flink/table/planner/utils/ExecutorUtils.java | 23 ++ .../table/planner/utils/ShuffleModeUtils.java | 55 ++ .../nodes/physical/batch/BatchExecExchange.scala | 4 +- .../table/planner/utils/ShuffleModeUtilsTest.java | 86 ++ .../planner/runtime/utils/BatchTestBase.scala | 6 +- .../flink/table/planner/utils/TableTestBase.scala | 6 +- 9 files changed, 197 insertions(+), 31 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java create mode 100644 flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java
[flink] branch master updated (efd22a7 -> 90fed72)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from efd22a7 [hotfix][filesystems] Remove unused StopWatch add 90fed72 [FLINK-17345][python][table] Support register and get Python UDF in Catalog (#11884) No new revisions were added by this update. Summary of changes: .../flink/table/catalog/hive/HiveCatalog.java | 24 +++- .../catalog/hive/HiveCatalogMetadataTestBase.java | 6 .../client/python/PythonFunctionFactoryTest.java | 16 +++ .../table/api/internal/TableEnvironmentImpl.java | 4 --- .../flink/table/catalog/CatalogFunctionImpl.java | 3 ++ .../flink/table/catalog/FunctionCatalog.java | 6 ++-- .../table/catalog/GenericInMemoryCatalogTest.java | 5 .../apache/flink/table/catalog/CatalogTest.java| 32 ++ .../operations/SqlToOperationConverter.java| 6 .../table/sqlexec/SqlToOperationConverter.java | 6 .../flink/table/api/internal/TableEnvImpl.scala| 2 -- 11 files changed, 82 insertions(+), 28 deletions(-)
[flink] 05/05: [hotfix][filesystems] Remove unused StopWatch
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit efd22a7cfc53d083b6c02941641d8f17f1c767ea Author: Gary Yao AuthorDate: Thu Apr 23 19:00:49 2020 +0200 [hotfix][filesystems] Remove unused StopWatch --- .../flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java| 4 1 file changed, 4 deletions(-) diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java index 05e8cd0..adf123f 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java @@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; -import org.apache.commons.lang3.time.StopWatch; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -332,9 +331,6 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT)); - final StopWatch sw = new StopWatch(); - sw.start(); - boolean isClosed = dfs.isFileClosed(path); while (!isClosed && deadline.hasTimeLeft()) { try {
[flink] 01/05: [hotfix][runtime] Throw exception from TestingSchedulingPipelinedRegion#getVertex() for unknown vertices
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fd13c2c79002e436df3c430e79d010f13b567e2d Author: Gary Yao AuthorDate: Tue Apr 21 15:40:11 2020 +0200 [hotfix][runtime] Throw exception from TestingSchedulingPipelinedRegion#getVertex() for unknown vertices --- .../scheduler/strategy/TestingSchedulingPipelinedRegion.java| 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java index 34450a7..2166fdc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java @@ -52,7 +52,11 @@ public class TestingSchedulingPipelinedRegion implements SchedulingPipelinedRegi @Override public TestingSchedulingExecutionVertex getVertex(ExecutionVertexID vertexId) { - return regionVertices.get(vertexId); + final TestingSchedulingExecutionVertex executionVertex = regionVertices.get(vertexId); + if (executionVertex == null) { + throw new IllegalArgumentException(String.format("Execution vertex %s not found in pipelined region", vertexId)); + } + return executionVertex; } @Override
[flink] 03/05: [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RegionPartitionReleaseStrategy
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 95b3c955f115dacb58b9695ae4192f729f5d5662 Author: Gary Yao AuthorDate: Tue Apr 21 15:23:40 2020 +0200 [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RegionPartitionReleaseStrategy Avoid re-computing pipelined regions in the RegionPartitionReleaseStrategy using the PipelinedRegionComputeUtil. Instead, rely on the pipelined regions provided by the Topology. --- .../failover/flip1/PipelinedRegionComputeUtil.java | 20 - .../flip1/partitionrelease/PipelinedRegion.java| 69 --- .../PipelinedRegionConsumedBlockingPartitions.java | 51 .../PipelinedRegionExecutionView.java | 24 +++--- .../RegionPartitionReleaseStrategy.java| 97 -- .../RegionPartitionReleaseStrategyTest.java| 38 + .../PipelinedRegionExecutionViewTest.java | 21 +++-- 7 files changed, 45 insertions(+), 275 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java index aa94841..52d96f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -19,9 +19,6 @@ package org.apache.flink.runtime.executiongraph.failover.flip1; -import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.topology.BaseTopology; import org.apache.flink.runtime.topology.Result; import org.apache.flink.runtime.topology.Vertex; @@ -34,8 +31,6 @@ import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Map; import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; /** * Utility for computing pipelined regions. @@ -44,21 +39,6 @@ public final class PipelinedRegionComputeUtil { private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class); - public static Set toPipelinedRegionsSet( - final Set> distinctRegions) { - - return distinctRegions.stream() - .map(toExecutionVertexIdSet()) - .map(PipelinedRegion::from) - .collect(Collectors.toSet()); - } - - private static Function, Set> toExecutionVertexIdSet() { - return failoverVertices -> failoverVertices.stream() - .map(SchedulingExecutionVertex::getId) - .collect(Collectors.toSet()); - } - public static , R extends Result> Set> computePipelinedRegions( final BaseTopology topology) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java deleted file mode 100644 index 36c042e..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; - -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Set of execution vertices that are connected through pipelined intermediate result partitions. - */
[flink] 02/05: [FLINK-17180][runtime] Implement SchedulingPipelinedRegion interface
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 23c13bbdbfa1b538a6c9e4e9622ef4563f69cd03 Author: Gary Yao AuthorDate: Mon Apr 20 19:17:07 2020 +0200 [FLINK-17180][runtime] Implement SchedulingPipelinedRegion interface Implement interfaces - Toplogy#getAllPipelinedRegions() - Topology#getPipelinedRegionOfVertex(ExecutionVertexID) in DefaultExecutionTopology to enable retrieval of pipelined regions. --- .../adapter/DefaultExecutionTopology.java | 49 .../adapter/DefaultSchedulingPipelinedRegion.java | 85 ++ .../adapter/DefaultExecutionTopologyTest.java | 21 .../DefaultSchedulingPipelinedRegionTest.java | 127 + 4 files changed, 282 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java index 63bd39f..d7eb54e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java @@ -23,16 +23,23 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,6 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class DefaultExecutionTopology implements SchedulingTopology { + private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionTopology.class); + private final boolean containsCoLocationConstraints; private final Map executionVerticesById; @@ -49,6 +58,10 @@ public class DefaultExecutionTopology implements SchedulingTopology { private final Map resultPartitionsById; + private final Map pipelinedRegionsByVertex; + + private final List pipelinedRegions; + public DefaultExecutionTopology(ExecutionGraph graph) { checkNotNull(graph, "execution graph can not be null"); @@ -74,6 +87,28 @@ public class DefaultExecutionTopology implements SchedulingTopology { this.resultPartitionsById = tmpResultPartitionsById; connectVerticesToConsumedPartitions(executionVertexMap, tmpResultPartitionsById); + + this.pipelinedRegionsByVertex = new HashMap<>(); + this.pipelinedRegions = new ArrayList<>(); + initializePipelinedRegions(); + } + + private void initializePipelinedRegions() { + final long buildRegionsStartTime = System.nanoTime(); + + final Set> rawPipelinedRegions = PipelinedRegionComputeUtil.computePipelinedRegions(this); + for (Set rawPipelinedRegion : rawPipelinedRegions) { + //noinspection unchecked + final DefaultSchedulingPipelinedRegion pipelinedRegion = new DefaultSchedulingPipelinedRegion((Set) rawPipelinedRegion); + pipelinedRegions.add(pipelinedRegion); + + for (SchedulingExecutionVertex executionVertex : rawPipelinedRegion) { + pipelinedRegionsByVertex.put(executionVertex.getId(), pipelinedRegion); + } + } + + final long buildRegionsDuration = (System.nanoTime() - buildRegionsStartTime) / 1_000_000; + LOG.info("Built {} pipelined regions in {} ms", pipelinedRegions.size(), buildRegionsDuration); } @Override @@ -104,6 +139,20 @@ public class DefaultExecutionTopology implements SchedulingTopology { return resultPartition; } + @Override + public Iterable getAllPipelinedRegions() { + return Collections.unmodifiableCollection(pipelinedRegions); + } + +
[flink] branch master updated (8514200 -> efd22a7)
This is an automated email from the ASF dual-hosted git repository. gary pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8514200 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache new fd13c2c [hotfix][runtime] Throw exception from TestingSchedulingPipelinedRegion#getVertex() for unknown vertices new 23c13bb [FLINK-17180][runtime] Implement SchedulingPipelinedRegion interface new 95b3c95 [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RegionPartitionReleaseStrategy new f9c23a0 [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RestartPipelinedRegionFailoverStrategy new efd22a7 [hotfix][filesystems] Remove unused StopWatch The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../hdfs/HadoopRecoverableFsDataOutputStream.java | 4 - .../failover/flip1/FailoverRegion.java | 68 --- .../failover/flip1/PipelinedRegionComputeUtil.java | 20 .../RestartPipelinedRegionFailoverStrategy.java| 63 +++--- .../flip1/partitionrelease/PipelinedRegion.java| 69 --- .../PipelinedRegionConsumedBlockingPartitions.java | 51 - .../PipelinedRegionExecutionView.java | 24 ++-- .../RegionPartitionReleaseStrategy.java| 97 +++- .../adapter/DefaultExecutionTopology.java | 49 .../adapter/DefaultSchedulingPipelinedRegion.java | 85 ++ .../RegionPartitionReleaseStrategyTest.java| 38 +- ...ipelinedRegionFailoverStrategyBuildingTest.java | 125 ++-- .../PipelinedRegionExecutionViewTest.java | 21 ++-- .../adapter/DefaultExecutionTopologyTest.java | 21 .../DefaultSchedulingPipelinedRegionTest.java | 127 + .../strategy/TestingSchedulingPipelinedRegion.java | 6 +- 16 files changed, 410 insertions(+), 458 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java
[flink] 04/05: [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RestartPipelinedRegionFailoverStrategy
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f9c23a0b86121d6361df403a05f75ba4b3902735 Author: Gary Yao AuthorDate: Tue Apr 21 15:59:27 2020 +0200 [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RestartPipelinedRegionFailoverStrategy Avoid re-computing pipelined regions in the RestartPipelinedRegionFailoverStrategy using the PipelinedRegionComputeUtil. Instead, rely on the pipelined regions provided by the Topology. This closes #11857. --- .../failover/flip1/FailoverRegion.java | 68 --- .../RestartPipelinedRegionFailoverStrategy.java| 63 +++ ...ipelinedRegionFailoverStrategyBuildingTest.java | 125 +++-- 3 files changed, 78 insertions(+), 178 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java deleted file mode 100644 index d1efb6f..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph.failover.flip1; - -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; - -import java.util.HashSet; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * FailoverRegion is a subset of all the vertices in the job topology. - */ -public class FailoverRegion { - - /** All vertex IDs in this region. */ - private final Set executionVertexIDs; - - /** All vertices in this region. */ - private final Set executionVertices; - - /** -* Creates a new failover region containing a set of vertices. -* -* @param executionVertices to be contained in this region -*/ - public FailoverRegion(Set executionVertices) { - this.executionVertices = checkNotNull(executionVertices); - this.executionVertexIDs = new HashSet<>(); - executionVertices.forEach(v -> this.executionVertexIDs.add(v.getId())); - } - - /** -* Returns IDs of all vertices in this region. -* -* @return IDs of all vertices in this region -*/ - public Set getAllExecutionVertexIDs() { - return executionVertexIDs; - } - - /** -* Returns all vertices in this region. -* -* @return all vertices in this region -*/ - public Set getAllExecutionVertices() { - return executionVertices; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java index 3c158f0..eb8d06b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionException; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.util.ExceptionUtils; @@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.Collections; -import
[flink] branch release-1.9 updated (8c8fa36 -> 244ca66)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from 8c8fa36 [FLINK-17215][python][docs] Clean the build document for Pyflink (#11792) add 513d7b7 [hotfix] Remove unused docs.rest.NoOpTransientBlobService add 509da3a [hotfix] Make NoOpTransientBlobService a public testing class add aa6966b [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint add eb75338 [FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint add d90ec14 [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache add 244ca66 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache No new revisions were added by this update. Summary of changes: .../runtime/dispatcher/DispatcherRestEndpoint.java | 7 +- ...tDispatcherResourceManagerComponentFactory.java | 4 +- .../jobmaster/MiniDispatcherRestEndpoint.java | 7 +- .../flink/runtime/rest/JobRestEndpointFactory.java | 5 +- .../flink/runtime/rest/RestEndpointFactory.java| 14 ++- .../runtime/rest/SessionRestEndpointFactory.java | 5 +- ...hCache.java => DefaultExecutionGraphCache.java} | 27 + .../rest/handler/legacy/ExecutionGraphCache.java | 128 ++--- .../runtime/webmonitor/WebMonitorEndpoint.java | 35 -- .../runtime/blob}/NoOpTransientBlobService.java| 10 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 4 +- ...askExecutionAttemptAccumulatorsHandlerTest.java | 4 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 4 +- ...st.java => DefaultExecutionGraphCacheTest.java} | 18 +-- .../util/DocumentingDispatcherRestEndpoint.java| 58 +- .../rest/util/NoOpExecutionGraphCache.java}| 28 +++-- .../webmonitor/TestingExecutionGraphCache.java | 113 ++ .../runtime/webmonitor/WebMonitorEndpointTest.java | 79 + 18 files changed, 307 insertions(+), 243 deletions(-) copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCache.java => DefaultExecutionGraphCache.java} (83%) rename {flink-docs/src/main/java/org/apache/flink/docs/rest => flink-runtime/src/test/java/org/apache/flink/runtime/blob}/NoOpTransientBlobService.java (84%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCacheTest.java => DefaultExecutionGraphCacheTest.java} (93%) copy flink-runtime/src/{main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java => test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java} (52%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
[flink] 02/02: [FLINK-17308] Add regular cleanup task for ExecutionGraphCache
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 74c29879810c64100534e7892b5edc6e03bdc2d9 Author: Till Rohrmann AuthorDate: Thu Apr 23 10:53:27 2020 +0200 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache The WebMonitorEndpoint now schedules are regular cleanup task which runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up expired ExecutionGraphCache entries. This ensures that we will remove unused entries. This closes #11882. --- .../runtime/webmonitor/WebMonitorEndpoint.java | 20 .../webmonitor/TestingExecutionGraphCache.java | 113 + .../runtime/webmonitor/WebMonitorEndpointTest.java | 79 ++ 3 files changed, 212 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 3d9e94e..ad98cc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -127,6 +127,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -138,6 +139,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -167,6 +169,9 @@ public class WebMonitorEndpoint extends RestServerEndp private final Collection archivingHandlers = new ArrayList<>(16); + @Nullable + private ScheduledFuture executionGraphCleanupTask; + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever leaderRetriever, @@ -682,13 +687,28 @@ public class WebMonitorEndpoint extends RestServerEndp @Override public void startInternal() throws Exception { leaderElectionService.start(this); + startExecutionGraphCacheCleanupTask(); + if (hasWebUI) { log.info("Web frontend listening at {}.", getRestBaseUrl()); } } + private void startExecutionGraphCacheCleanupTask() { + final long cleanupInterval = 2 * restConfiguration.getRefreshInterval(); + executionGraphCleanupTask = executor.scheduleWithFixedDelay( + executionGraphCache::cleanup, + cleanupInterval, + cleanupInterval, + TimeUnit.MILLISECONDS); + } + @Override protected CompletableFuture shutDownInternal() { + if (executionGraphCleanupTask != null) { + executionGraphCleanupTask.cancel(false); + } + executionGraphCache.close(); final CompletableFuture shutdownFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java new file mode 100644 index 000..a61bdc3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; + +import java.util.concurrent.CompletableFuture; +import
[flink] branch release-1.10 updated (0e2b520 -> 74c2987)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 0e2b520 [FLINK-17338] Increase timeouts on LocalExecutorITCase add 0157501 [hotfix] Remove unused docs.rest.NoOpTransientBlobService add 85a9e7e [hotfix] Make NoOpTransientBlobService a public testing class add 138b350 [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint add c4ec62f [FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint new ba6b23b [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache new 74c2987 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/dispatcher/DispatcherRestEndpoint.java | 7 +- ...tDispatcherResourceManagerComponentFactory.java | 4 +- .../jobmaster/MiniDispatcherRestEndpoint.java | 7 +- .../flink/runtime/rest/JobRestEndpointFactory.java | 5 +- .../flink/runtime/rest/RestEndpointFactory.java| 14 ++- .../runtime/rest/SessionRestEndpointFactory.java | 5 +- ...hCache.java => DefaultExecutionGraphCache.java} | 27 + .../rest/handler/legacy/ExecutionGraphCache.java | 128 ++--- .../runtime/webmonitor/WebMonitorEndpoint.java | 35 -- .../runtime/blob}/NoOpTransientBlobService.java| 10 +- .../rest/handler/job/JobConfigHandlerTest.java | 4 +- .../rest/handler/job/JobExceptionsHandlerTest.java | 4 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 4 +- ...askExecutionAttemptAccumulatorsHandlerTest.java | 4 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 4 +- .../metrics/JobVertexWatermarksHandlerTest.java| 4 +- ...st.java => DefaultExecutionGraphCacheTest.java} | 18 +-- .../util/DocumentingDispatcherRestEndpoint.java| 58 +- .../util/NoOpExecutionGraphCache.java} | 27 +++-- .../webmonitor/TestingExecutionGraphCache.java | 113 ++ .../runtime/webmonitor/WebMonitorEndpointTest.java | 79 + 21 files changed, 308 insertions(+), 253 deletions(-) copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCache.java => DefaultExecutionGraphCache.java} (83%) rename {flink-docs/src/main/java/org/apache/flink/docs/rest => flink-runtime/src/test/java/org/apache/flink/runtime/blob}/NoOpTransientBlobService.java (84%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCacheTest.java => DefaultExecutionGraphCacheTest.java} (93%) copy flink-runtime/src/test/java/org/apache/flink/runtime/{dispatcher/NoOpJobGraphWriter.java => rest/util/NoOpExecutionGraphCache.java} (52%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
[flink] 01/02: [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit ba6b23bc13cbe391a04089856cc6a64f4555f4f9 Author: Till Rohrmann AuthorDate: Thu Apr 23 10:38:15 2020 +0200 [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache This commit extracts the ExecutionGraphCache interface from the implementation and renames the latter into DefaultExecutionGraphCache. Moreover, it introduces the NoOpExecutionGraphCache implementation which is used for the DocumentingDispatcherRestEndpoint. --- .../flink/runtime/rest/RestEndpointFactory.java| 3 +- ...hCache.java => DefaultExecutionGraphCache.java} | 27 + .../rest/handler/legacy/ExecutionGraphCache.java | 128 ++--- .../rest/handler/job/JobConfigHandlerTest.java | 4 +- .../rest/handler/job/JobExceptionsHandlerTest.java | 4 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 4 +- ...askExecutionAttemptAccumulatorsHandlerTest.java | 4 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 4 +- .../metrics/JobVertexWatermarksHandlerTest.java| 4 +- ...st.java => DefaultExecutionGraphCacheTest.java} | 18 +-- .../util/DocumentingDispatcherRestEndpoint.java| 3 +- .../runtime/rest/util/NoOpExecutionGraphCache.java | 50 12 files changed, 89 insertions(+), 164 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java index d148c24..b2e21cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -52,7 +53,7 @@ public interface RestEndpointFactory { FatalErrorHandler fatalErrorHandler) throws Exception; static ExecutionGraphCache createExecutionGraphCache(RestHandlerConfiguration restConfiguration) { - return new ExecutionGraphCache( + return new DefaultExecutionGraphCache( restConfiguration.getTimeout(), Time.milliseconds(restConfiguration.getRefreshInterval())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java similarity index 83% copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java index f634a62..a01704e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.util.Preconditions; -import java.io.Closeable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -33,11 +32,9 @@ import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache entry - * has an associated time to live after which a new request will trigger the reloading of the - * {@link ArchivedExecutionGraph} from the cluster. + * Default implementation of {@link ExecutionGraphCache}. */ -public class ExecutionGraphCache implements Closeable { +public class DefaultExecutionGraphCache implements ExecutionGraphCache { private final Time timeout; @@ -47,7 +44,7 @@ public class ExecutionGraphCache implements Closeable { private volatile boolean running = true; - public ExecutionGraphCache( + public DefaultExecutionGraphCache( Time timeout, Time timeToLive) { this.timeout = checkNotNull(timeout); @@
[flink] branch master updated (948a42f -> 8514200)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 948a42f [FLINK-17344][test] Fix unstability on IdleTime metric test. new e48fac8 [hotfix] Remove unused docs.rest.NoOpTransientBlobService new 5cfa396 [hotfix] Make NoOpTransientBlobService a public testing class new bf0cfa6 [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint new 3635b54 [FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint new 08025ec [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache new 8514200 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/dispatcher/DispatcherRestEndpoint.java | 7 +- ...tDispatcherResourceManagerComponentFactory.java | 4 +- .../jobmaster/MiniDispatcherRestEndpoint.java | 7 +- .../flink/runtime/rest/JobRestEndpointFactory.java | 5 +- .../flink/runtime/rest/RestEndpointFactory.java| 14 ++- .../runtime/rest/SessionRestEndpointFactory.java | 5 +- ...hCache.java => DefaultExecutionGraphCache.java} | 27 + .../rest/handler/legacy/ExecutionGraphCache.java | 128 ++--- .../runtime/webmonitor/WebMonitorEndpoint.java | 36 -- .../runtime/blob}/NoOpTransientBlobService.java| 10 +- .../rest/handler/job/JobConfigHandlerTest.java | 4 +- .../rest/handler/job/JobExceptionsHandlerTest.java | 4 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 4 +- ...askExecutionAttemptAccumulatorsHandlerTest.java | 4 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 4 +- .../metrics/JobVertexWatermarksHandlerTest.java| 4 +- ...st.java => DefaultExecutionGraphCacheTest.java} | 18 +-- .../util/DocumentingDispatcherRestEndpoint.java| 58 +- .../util/NoOpExecutionGraphCache.java} | 27 +++-- .../webmonitor/TestingExecutionGraphCache.java | 113 ++ .../runtime/webmonitor/WebMonitorEndpointTest.java | 79 + 21 files changed, 309 insertions(+), 253 deletions(-) copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCache.java => DefaultExecutionGraphCache.java} (83%) rename {flink-docs/src/main/java/org/apache/flink/docs/rest => flink-runtime/src/test/java/org/apache/flink/runtime/blob}/NoOpTransientBlobService.java (84%) rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCacheTest.java => DefaultExecutionGraphCacheTest.java} (93%) copy flink-runtime/src/test/java/org/apache/flink/runtime/{dispatcher/NoOpJobGraphWriter.java => rest/util/NoOpExecutionGraphCache.java} (52%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
[flink] 02/06: [hotfix] Make NoOpTransientBlobService a public testing class
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5cfa396eede2f714a2ee78953d4ab1ed5a152481 Author: Till Rohrmann AuthorDate: Thu Apr 23 09:46:56 2020 +0200 [hotfix] Make NoOpTransientBlobService a public testing class --- .../runtime/blob/NoOpTransientBlobService.java | 75 ++ .../util/DocumentingDispatcherRestEndpoint.java| 55 +--- 2 files changed, 76 insertions(+), 54 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/NoOpTransientBlobService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/NoOpTransientBlobService.java new file mode 100644 index 000..3f0bf2f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/NoOpTransientBlobService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +/** + * No-op implementation of {@link TransientBlobService}. + */ +public enum NoOpTransientBlobService implements TransientBlobService { + INSTANCE; + + @Override + public File getFile(TransientBlobKey key) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public File getFile(JobID jobId, TransientBlobKey key) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TransientBlobKey putTransient(byte[] value) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TransientBlobKey putTransient(InputStream inputStream) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean deleteFromCache(TransientBlobKey key) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean deleteFromCache(JobID jobId, TransientBlobKey key) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index db45ee0..e727ab2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -18,13 +18,11 @@ package org.apache.flink.runtime.rest.util; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.runtime.blob.TransientBlobKey; -import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.blob.NoOpTransientBlobService; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.leaderelection.LeaderContender; @@ -43,9 +41,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import javax.annotation.Nonnull; -import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.util.List; import java.util.UUID; import
[flink] 03/06: [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit bf0cfa606487c742a38cfccfee780a613894a8d8 Author: Till Rohrmann AuthorDate: Thu Apr 23 10:19:50 2020 +0200 [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint --- .../flink/runtime/dispatcher/DispatcherRestEndpoint.java | 4 ++-- .../DefaultDispatcherResourceManagerComponentFactory.java | 4 ++-- .../flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java| 4 ++-- .../org/apache/flink/runtime/rest/JobRestEndpointFactory.java | 4 ++-- .../org/apache/flink/runtime/rest/RestEndpointFactory.java | 4 ++-- .../apache/flink/runtime/rest/SessionRestEndpointFactory.java | 4 ++-- .../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java| 10 +- .../runtime/rest/util/DocumentingDispatcherRestEndpoint.java | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 0bbb10f..ba6f678 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -42,7 +42,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * REST endpoint for the {@link Dispatcher} component. @@ -58,7 +58,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint resourceManagerRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java index e1d1296..84b94a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java @@ -71,7 +71,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components. @@ -137,7 +137,7 @@ public class DefaultDispatcherResourceManagerComponentFactory implements Dispatc 10, Time.milliseconds(50L)); - final ExecutorService executor = WebMonitorEndpoint.createExecutorService( + final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java index f11b669..e9f7d61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * REST endpoint for the {@link JobClusterEntrypoint}. @@ -46,7 +46,7 @@ public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint resourceManagerRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor,
[flink] 06/06: [FLINK-17308] Add regular cleanup task for ExecutionGraphCache
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 851420014584fdfd91308937cd913917c11cf4bd Author: Till Rohrmann AuthorDate: Thu Apr 23 10:53:27 2020 +0200 [FLINK-17308] Add regular cleanup task for ExecutionGraphCache The WebMonitorEndpoint now schedules are regular cleanup task which runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up expired ExecutionGraphCache entries. This ensures that we will remove unused entries. This closes #11879. --- .../runtime/webmonitor/WebMonitorEndpoint.java | 21 .../webmonitor/TestingExecutionGraphCache.java | 113 + .../runtime/webmonitor/WebMonitorEndpointTest.java | 79 ++ 3 files changed, 213 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 7ccf12e..a41926b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -136,6 +136,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -146,6 +148,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -175,6 +178,9 @@ public class WebMonitorEndpoint extends RestServerEndp private final Collection archivingHandlers = new ArrayList<>(16); + @Nullable + private ScheduledFuture executionGraphCleanupTask; + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever leaderRetriever, @@ -725,13 +731,28 @@ public class WebMonitorEndpoint extends RestServerEndp @Override public void startInternal() throws Exception { leaderElectionService.start(this); + startExecutionGraphCacheCleanupTask(); + if (hasWebUI) { log.info("Web frontend listening at {}.", getRestBaseUrl()); } } + private void startExecutionGraphCacheCleanupTask() { + final long cleanupInterval = 2 * restConfiguration.getRefreshInterval(); + executionGraphCleanupTask = executor.scheduleWithFixedDelay( + executionGraphCache::cleanup, + cleanupInterval, + cleanupInterval, + TimeUnit.MILLISECONDS); + } + @Override protected CompletableFuture shutDownInternal() { + if (executionGraphCleanupTask != null) { + executionGraphCleanupTask.cancel(false); + } + executionGraphCache.close(); final CompletableFuture shutdownFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java new file mode 100644 index 000..a61bdc3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; + +import java.util.concurrent.CompletableFuture; +import
[flink] 01/06: [hotfix] Remove unused docs.rest.NoOpTransientBlobService
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e48fac8cc0f0255261e4dcb55f7b50eea1608f5c Author: Till Rohrmann AuthorDate: Thu Apr 23 09:44:51 2020 +0200 [hotfix] Remove unused docs.rest.NoOpTransientBlobService --- .../flink/docs/rest/NoOpTransientBlobService.java | 77 -- 1 file changed, 77 deletions(-) diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/NoOpTransientBlobService.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/NoOpTransientBlobService.java deleted file mode 100644 index 1d307c4..000 --- a/flink-docs/src/main/java/org/apache/flink/docs/rest/NoOpTransientBlobService.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.docs.rest; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.TransientBlobKey; -import org.apache.flink.runtime.blob.TransientBlobService; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; - -/** - * No-op implementation of {@link TransientBlobService} used by the {@link RestAPIDocGenerator}. - */ -enum NoOpTransientBlobService implements TransientBlobService { - INSTANCE; - - @Override - public File getFile(TransientBlobKey key) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public File getFile(JobID jobId, TransientBlobKey key) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public TransientBlobKey putTransient(byte[] value) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public TransientBlobKey putTransient(InputStream inputStream) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean deleteFromCache(TransientBlobKey key) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean deleteFromCache(JobID jobId, TransientBlobKey key) { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException {} -}
[flink] 05/06: [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 08025ec5d59659adea517d3f363abedd00c4fe7c Author: Till Rohrmann AuthorDate: Thu Apr 23 10:38:15 2020 +0200 [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache This commit extracts the ExecutionGraphCache interface from the implementation and renames the latter into DefaultExecutionGraphCache. Moreover, it introduces the NoOpExecutionGraphCache implementation which is used for the DocumentingDispatcherRestEndpoint. --- .../flink/runtime/rest/RestEndpointFactory.java| 3 +- ...hCache.java => DefaultExecutionGraphCache.java} | 27 + .../rest/handler/legacy/ExecutionGraphCache.java | 128 ++--- .../rest/handler/job/JobConfigHandlerTest.java | 4 +- .../rest/handler/job/JobExceptionsHandlerTest.java | 4 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 4 +- ...askExecutionAttemptAccumulatorsHandlerTest.java | 4 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 4 +- .../metrics/JobVertexWatermarksHandlerTest.java| 4 +- ...st.java => DefaultExecutionGraphCacheTest.java} | 18 +-- .../util/DocumentingDispatcherRestEndpoint.java| 3 +- .../runtime/rest/util/NoOpExecutionGraphCache.java | 50 12 files changed, 89 insertions(+), 164 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java index d148c24..b2e21cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -52,7 +53,7 @@ public interface RestEndpointFactory { FatalErrorHandler fatalErrorHandler) throws Exception; static ExecutionGraphCache createExecutionGraphCache(RestHandlerConfiguration restConfiguration) { - return new ExecutionGraphCache( + return new DefaultExecutionGraphCache( restConfiguration.getTimeout(), Time.milliseconds(restConfiguration.getRefreshInterval())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java similarity index 83% copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java index f634a62..a01704e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.util.Preconditions; -import java.io.Closeable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -33,11 +32,9 @@ import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache entry - * has an associated time to live after which a new request will trigger the reloading of the - * {@link ArchivedExecutionGraph} from the cluster. + * Default implementation of {@link ExecutionGraphCache}. */ -public class ExecutionGraphCache implements Closeable { +public class DefaultExecutionGraphCache implements ExecutionGraphCache { private final Time timeout; @@ -47,7 +44,7 @@ public class ExecutionGraphCache implements Closeable { private volatile boolean running = true; - public ExecutionGraphCache( + public DefaultExecutionGraphCache( Time timeout, Time timeToLive) { this.timeout = checkNotNull(timeout); @@ -64,22
[flink] 04/06: [FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3635b54c7b49ea48936ae4baff91a20a96e2f96c Author: Till Rohrmann AuthorDate: Thu Apr 23 10:26:06 2020 +0200 [FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint --- .../apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java | 3 +++ .../flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java | 3 +++ .../org/apache/flink/runtime/rest/JobRestEndpointFactory.java| 1 + .../java/org/apache/flink/runtime/rest/RestEndpointFactory.java | 9 + .../apache/flink/runtime/rest/SessionRestEndpointFactory.java| 1 + .../org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java | 5 ++--- .../runtime/rest/util/DocumentingDispatcherRestEndpoint.java | 2 ++ 7 files changed, 21 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index ba6f678..870cb5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; @@ -61,6 +62,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint { MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception; + + static ExecutionGraphCache createExecutionGraphCache(RestHandlerConfiguration restConfiguration) { + return new ExecutionGraphCache( + restConfiguration.getTimeout(), + Time.milliseconds(restConfiguration.getRefreshInterval())); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java index b091197..c59d220 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java @@ -60,6 +60,7 @@ public enum SessionRestEndpointFactory implements RestEndpointFactory extends RestServerEndp ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, + ExecutionGraphCache executionGraphCache, FatalErrorHandler fatalErrorHandler) throws IOException { super(endpointConfiguration); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); @@ -194,9 +195,7 @@ public class WebMonitorEndpoint extends RestServerEndp this.transientBlobService = Preconditions.checkNotNull(transientBlobService); this.executor = Preconditions.checkNotNull(executor); - this.executionGraphCache = new ExecutionGraphCache( - restConfiguration.getTimeout(), - Time.milliseconds(restConfiguration.getRefreshInterval())); + this.executionGraphCache = executionGraphCache; this.checkpointStatsCache = new CheckpointStatsCache( restConfiguration.getMaxCheckpointStatisticCacheEntries()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index c3db3c3..9f84bce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.RestEndpointFactory;
[flink] branch master updated (eeaf08f -> 948a42f)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from eeaf08f [FLINK-16667][python][client] Support the Python dependency configuration options in CliFrontend (#11702) add 948a42f [FLINK-17344][test] Fix unstability on IdleTime metric test. No new revisions were added by this update. Summary of changes: .../flink/runtime/io/network/api/writer/RecordWriterTest.java | 11 --- .../runtime/tasks/mailbox/TaskMailboxProcessorTest.java | 4 2 files changed, 8 insertions(+), 7 deletions(-)
[flink] branch master updated (bf39d75 -> eeaf08f)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from bf39d75 [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header add eeaf08f [FLINK-16667][python][client] Support the Python dependency configuration options in CliFrontend (#11702) No new revisions were added by this update. Summary of changes: .../org/apache/flink/client/cli/CliFrontend.java | 39 ++-- .../apache/flink/client/cli/CliFrontendParser.java | 8 +- .../flink/client/cli/ExecutionConfigAccessor.java | 12 +-- .../apache/flink/client/cli/ProgramOptions.java| 110 ++--- .../flink/client/cli/ProgramOptionsUtils.java | 96 ++ .../flink/client/program/PackagedProgram.java | 35 +-- .../flink/client/program/PackagedProgramUtils.java | 42 .../client/cli/CliFrontendPackageProgramTest.java | 12 +-- .../flink/client/cli/CliFrontendRunTest.java | 27 + .../flink/client/cli/PythonProgramOptions.java | 97 ++ .../flink/python/util/PythonDependencyUtils.java | 23 + .../flink/client/cli/PythonProgramOptionsTest.java | 107 .../python/util/PythonDependencyUtilsTest.java | 6 +- .../client/gateway/local/ExecutionContext.java | 2 +- 14 files changed, 442 insertions(+), 174 deletions(-) create mode 100644 flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java create mode 100644 flink-python/src/main/java/org/apache/flink/client/cli/PythonProgramOptions.java create mode 100644 flink-python/src/test/java/org/apache/flink/client/cli/PythonProgramOptionsTest.java
[flink] 03/04: [FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing updates of emit strategy
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 670ef41ed59dbf7069e313e5b0f8b373bd4245b4 Author: Jark Wu AuthorDate: Tue Apr 21 00:04:12 2020 +0800 [FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing updates of emit strategy This closes #11797 --- .../planner/plan/utils/WindowEmitStrategy.scala| 2 +- .../plan/stream/sql/agg/WindowAggregateTest.xml| 272 - .../plan/stream/sql/agg/WindowAggregateTest.scala | 28 +++ .../operators/window/WindowOperatorBuilder.java| 2 - 4 files changed, 191 insertions(+), 113 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala index eff8a48..07b30f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala @@ -63,7 +63,7 @@ class WindowEmitStrategy( def produceUpdates: Boolean = { if (isEventTime) { - allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled + earlyFireDelayEnabled || lateFireDelayEnabled } else { earlyFireDelayEnabled } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index 13553e4..2459697 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -134,6 +134,32 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3]) ]]> + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + - + - + - + + + + + + + + + + + + @@ -638,4 +664,30 @@ Union(all=[true], union=[EXPR$0]) ]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index 83d5544..fe66dd4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -18,10 +18,12 @@ package org.apache.flink.table.planner.plan.stream.sql.agg +import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge +import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_LATE_FIRE_DELAY, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED} import org.apache.flink.table.planner.utils.TableTestBase import org.junit.Test @@ -414,4 +416,30 @@ class WindowAggregateTest extends TableTestBase { util.verifyPlan(sql) } + + @Test + def testWindowAggregateWithLateFire(): Unit = { +util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, true) +util.conf.getConfiguration.setString(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, "5s") +util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) +val sql = + """ +|SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt +|FROM MyTable +|GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND) +|""".stripMargin +util.verifyPlanWithTrait(sql) + } + + @Test + def testWindowAggregateWithAllowLatenessOnly(): Unit = { +util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2)) +val sql = + """ +|SELECT
[flink] branch master updated (26af121 -> bf39d75)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 26af121 [hotfix][docs] link tags in zh docs require .zh.md links new 91f4a3c [FLINK-17169][core] Add shortString() method to RowKind to get short string representation new 7662e0b [FLINK-17169][table-planner-blink] Rename StreamRecordUtils#record() to insertRecord() new 670ef41 [FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing updates of emit strategy new bf39d75 [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/flink/types/RowKind.java | 67 +++- ...bstractBaseRowPythonScalarFunctionOperator.java | 2 +- .../BaseRowPythonScalarFunctionOperator.java | 2 +- .../BaseRowArrowPythonScalarFunctionOperator.java | 2 +- .../table/BaseRowPythonTableFunctionOperator.java | 4 +- .../BaseRowPythonScalarFunctionOperatorTest.java | 6 +- ...seRowArrowPythonScalarFunctionOperatorTest.java | 6 +- .../BaseRowPythonTableFunctionOperatorTest.java| 8 +- .../table/planner/codegen/CalcCodeGenerator.scala | 2 +- .../flink/table/planner/codegen/CodeGenUtils.scala | 6 +- .../planner/codegen/CorrelateCodeGenerator.scala | 12 +- .../planner/codegen/EqualiserCodeGenerator.scala | 2 +- .../planner/codegen/ExpandCodeGenerator.scala | 2 +- .../planner/codegen/LookupJoinCodeGenerator.scala | 2 +- .../codegen/agg/AggsHandlerCodeGenerator.scala | 4 +- .../physical/stream/StreamExecDeduplicate.scala| 25 +- .../StreamExecGroupWindowAggregateBase.scala | 2 +- .../planner/plan/utils/WindowEmitStrategy.scala| 2 +- .../flink/table/planner/utils/BaseRowTestUtil.java | 6 +- .../plan/stream/sql/agg/WindowAggregateTest.xml| 272 -- .../plan/stream/sql/agg/WindowAggregateTest.scala | 28 ++ ...AbstractTwoInputStreamOperatorWithTTLTest.scala | 6 +- .../harness/GroupAggregateHarnessTest.scala| 68 ++-- .../harness/TableAggregateHarnessTest.scala| 90 ++--- .../planner/runtime/stream/sql/CalcITCase.scala| 4 +- .../planner/runtime/stream/sql/RankITCase.scala| 8 +- .../planner/runtime/stream/sql/ValuesITCase.scala | 2 +- .../table/planner/runtime/utils/TestSinkUtil.scala | 3 +- .../org/apache/flink/table/dataformat/BaseRow.java | 15 +- .../apache/flink/table/dataformat/BinaryRow.java | 19 +- .../flink/table/dataformat/BinaryRowWriter.java| 6 +- .../apache/flink/table/dataformat/ColumnarRow.java | 11 +- .../apache/flink/table/dataformat/GenericRow.java | 2 +- .../apache/flink/table/dataformat/JoinedRow.java | 12 +- .../apache/flink/table/dataformat/NestedRow.java | 11 +- .../flink/table/dataformat/ObjectArrayRow.java | 24 +- .../flink/table/dataformat/UpdatableRow.java | 10 +- .../flink/table/dataformat/util/BaseRowUtil.java | 33 +- .../operators/aggregate/GroupAggFunction.java | 18 +- .../aggregate/MiniBatchGlobalGroupAggFunction.java | 27 +- .../aggregate/MiniBatchGroupAggFunction.java | 22 +- .../deduplicate/DeduplicateFunctionHelper.java | 48 ++- .../DeduplicateKeepLastRowFunction.java| 20 +- .../MiniBatchDeduplicateKeepLastRowFunction.java | 5 +- .../join/lookup/AsyncLookupJoinRunner.java | 4 +- .../operators/join/lookup/LookupJoinRunner.java| 2 +- .../join/stream/StreamingJoinOperator.java | 72 ++-- .../join/stream/StreamingSemiAntiJoinOperator.java | 53 +-- .../temporal/TemporalProcessTimeJoinOperator.java | 2 +- .../join/temporal/TemporalRowTimeJoinOperator.java | 5 +- .../operators/rank/AbstractTopNFunction.java | 54 +-- .../operators/rank/AppendOnlyTopNFunction.java | 33 +- .../operators/rank/RetractableTopNFunction.java| 77 ++-- .../operators/rank/UpdatableTopNFunction.java | 95 ++--- .../runtime/operators/sort/StreamSortOperator.java | 3 +- .../operators/window/AggregateWindowOperator.java | 39 +- .../window/TableAggregateWindowOperator.java | 4 +- .../runtime/operators/window/WindowOperator.java | 12 +- .../operators/window/WindowOperatorBuilder.java| 18 +- .../table/runtime/typeutils/BaseRowSerializer.java | 4 +- .../apache/flink/table/dataformat/BaseRowTest.java | 7 +- .../flink/table/dataformat/BinaryRowTest.java | 13 +- .../DeduplicateKeepFirstRowFunctionTest.java | 32 +- .../DeduplicateKeepLastRowFunctionTest.java| 95 +++-- ...niBatchDeduplicateKeepFirstRowFunctionTest.java | 32 +- ...iniBatchDeduplicateKeepLastRowFunctionTest.java | 123 ---
[flink] 01/04: [FLINK-17169][core] Add shortString() method to RowKind to get short string representation
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 91f4a3c5081a99822430ba0683bce381aec188bd Author: Jark Wu AuthorDate: Fri Apr 17 22:11:44 2020 +0800 [FLINK-17169][core] Add shortString() method to RowKind to get short string representation This closes #11797 --- .../main/java/org/apache/flink/types/RowKind.java | 67 -- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/RowKind.java b/flink-core/src/main/java/org/apache/flink/types/RowKind.java index 3b45001..a1acf7d 100644 --- a/flink-core/src/main/java/org/apache/flink/types/RowKind.java +++ b/flink-core/src/main/java/org/apache/flink/types/RowKind.java @@ -29,7 +29,7 @@ public enum RowKind { /** * Insertion operation. */ - INSERT, + INSERT("+I", (byte) 0), /** * Update operation with the previous content of the updated row. @@ -38,7 +38,7 @@ public enum RowKind { * to retract the previous row first. It is useful in cases of a non-idempotent update, i.e., an * update of a row that is not uniquely identifiable by a key. */ - UPDATE_BEFORE, + UPDATE_BEFORE("-U", (byte) 1), /** * Update operation with new content of the updated row. @@ -47,10 +47,69 @@ public enum RowKind { * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update * of a row that is uniquely identifiable by a key. */ - UPDATE_AFTER, + UPDATE_AFTER("+U", (byte) 2), /** * Deletion operation. */ - DELETE + DELETE("-D", (byte) 3); + + private final String shortString; + + private final byte value; + + /** +* Creates a {@link RowKind} enum with the given short string and byte value representation of +* the {@link RowKind}. +*/ + RowKind(String shortString, byte value) { + this.shortString = shortString; + this.value = value; + } + + /** +* Returns a short string representation of this {@link RowKind}. +* +* +* "+I" represents {@link #INSERT}. +* "-U" represents {@link #UPDATE_BEFORE}. +* "+U" represents {@link #UPDATE_AFTER}. +* "-D" represents {@link #DELETE}. +* +*/ + public String shortString() { + return shortString; + } + + /** +* Returns the byte value representation of this {@link RowKind}. The byte value is used +* for serialization and deserialization. +* +* +* "0" represents {@link #INSERT}. +* "1" represents {@link #UPDATE_BEFORE}. +* "2" represents {@link #UPDATE_AFTER}. +* "3" represents {@link #DELETE}. +* +*/ + public byte toByteValue() { + return value; + } + + /** +* Creates a {@link RowKind} from the given byte value. Each {@link RowKind} has a byte +* value representation. +* +* @see #toByteValue() for mapping of byte value and {@link RowKind}. +*/ + public static RowKind fromByteValue(byte value) { + switch (value) { + case 0: return INSERT; + case 1: return UPDATE_BEFORE; + case 2: return UPDATE_AFTER; + case 3: return DELETE; + default: throw new UnsupportedOperationException( + "Unsupported byte value '" + value + "' for row kind."); + } + } }
[flink-docker] branch master updated: [FLINK-17346] Deduplicate process setup
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new 887ac36 [FLINK-17346] Deduplicate process setup 887ac36 is described below commit 887ac36c2b9611095f989eb0cff527fa196f0456 Author: Chesnay Schepler AuthorDate: Thu Apr 23 10:54:17 2020 +0200 [FLINK-17346] Deduplicate process setup --- testing/testing_lib.sh | 104 + 1 file changed, 26 insertions(+), 78 deletions(-) diff --git a/testing/testing_lib.sh b/testing/testing_lib.sh index 504029d..36dfe32 100644 --- a/testing/testing_lib.sh +++ b/testing/testing_lib.sh @@ -45,51 +45,42 @@ function build_image() { docker build -t "$image_name" "$dockerfile_dir" } -function run_jobmanager() { -local dockerfile -dockerfile="$1" +function internal_run() { +local dockerfile="$1" +local docker_run_command="$2" +local args="$3" local image_tag image_name image_tag="$(image_tag "$dockerfile")" image_name="$(image_name "$image_tag")" -echo >&2 "===> Starting ${image_tag} jobmanager..." - -# Prints container ID -docker run \ ---rm \ ---detach \ ---name "jobmanager" \ ---network "$NETWORK_NAME" \ ---publish 6123:6123 \ ---publish 8081:8081 \ --e JOB_MANAGER_RPC_ADDRESS="jobmanager" \ -"$image_name" \ -jobmanager +echo >&2 "===> Starting ${image_tag} ${args}..." + +eval "docker run --rm --detach --network $NETWORK_NAME -e JOB_MANAGER_RPC_ADDRESS=jobmanager ${docker_run_command} $image_name ${args}" +} + +function internal_run_jobmanager() { +internal_run "$1" "--name jobmanager --publish 6123:6123 --publish 8081:8081 $2" jobmanager +} + +function run_jobmanager() { +internal_run_jobmanager "$1" "" } function run_jobmanager_non_root() { -local dockerfile -dockerfile="$1" +internal_run_jobmanager "$1" "--user flink" +} -local image_tag image_name -image_tag="$(image_tag "$dockerfile")" -image_name="$(image_name "$image_tag")" +function internal_run_taskmanager() { +internal_run "$1" "--name taskmanager $2" "taskmanager" +} -echo >&2 "===> Starting ${image_tag} jobmanager as non-root..." - -# Prints container ID -docker run \ ---rm \ ---detach \ ---name "jobmanager" \ ---network "$NETWORK_NAME" \ ---user flink \ ---publish 6123:6123 \ ---publish 8081:8081 \ --e JOB_MANAGER_RPC_ADDRESS="jobmanager" \ -"$image_name" \ -jobmanager +function run_taskmanager() { + internal_run_taskmanager "$1" "" +} + +function run_taskmanager_non_root() { + internal_run_taskmanager "$1" "--user flink" } function wait_for_jobmanager() { @@ -131,49 +122,6 @@ function wait_for_jobmanager() { echo >&2 "===> ${image_tag} jobmanager is ready." } -function run_taskmanager() { -local dockerfile -dockerfile="$1" - -local image_tag image_name -image_tag="$(image_tag "$dockerfile")" -image_name="$(image_name "$image_tag")" - -echo >&2 "===> Starting ${image_tag} taskmanager..." - -# Prints container ID -docker run \ ---rm \ ---detach \ ---name "taskmanager" \ ---network "$NETWORK_NAME" \ --e JOB_MANAGER_RPC_ADDRESS="jobmanager" \ -"$image_name" \ -taskmanager -} - -function run_taskmanager_non_root() { -local dockerfile -dockerfile="$1" - -local image_tag image_name -image_tag="$(image_tag "$dockerfile")" -image_name="$(image_name "$image_tag")" - -echo >&2 "===> Starting ${image_tag} taskmanager as non-root..." - -# Prints container ID -docker run \ ---rm \ ---detach \ ---name "taskmanager" \ ---network "$NETWORK_NAME" \ ---user flink \ --e JOB_MANAGER_RPC_ADDRESS="jobmanager" \ -"$image_name" \ -taskmanager -} - function test_image() { local dockerfile dockerfile="$1"
[flink] branch master updated: [hotfix][docs] link tags in zh docs require .zh.md links
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 26af121 [hotfix][docs] link tags in zh docs require .zh.md links 26af121 is described below commit 26af1215d46089f3c66407e108ede9968cb9b8f7 Author: David Anderson AuthorDate: Fri Apr 24 09:23:06 2020 +0200 [hotfix][docs] link tags in zh docs require .zh.md links --- docs/dev/event_time.zh.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/dev/event_time.zh.md b/docs/dev/event_time.zh.md index a25122f..dbe964f 100644 --- a/docs/dev/event_time.zh.md +++ b/docs/dev/event_time.zh.md @@ -26,13 +26,13 @@ under the License. In this section you will learn about writing time-aware Flink programs. Please take a look at [Timely Stream Processing]({% link -concepts/timely-stream-processing.md %}) to learn about the concepts behind +concepts/timely-stream-processing.zh.md %}) to learn about the concepts behind timely stream processing. For information about how to use time in Flink programs refer to -[windowing]({% link dev/stream/operators/windows.md %}) and +[windowing]({% link dev/stream/operators/windows.zh.md %}) and [ProcessFunction]({% link -dev/stream/operators/process_function.md %}). +dev/stream/operators/process_function.zh.md %}). * toc {:toc}
[flink-docker] branch master updated: [FLINK-17187] Add utility method for setting config options
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new 379970e [FLINK-17187] Add utility method for setting config options 379970e is described below commit 379970e8d2a9e138d1291d83b47e7ea643421b3a Author: Chesnay Schepler AuthorDate: Thu Apr 16 13:34:29 2020 +0200 [FLINK-17187] Add utility method for setting config options --- 1.10/scala_2.11-debian/docker-entrypoint.sh | 64 +++-- 1.10/scala_2.12-debian/docker-entrypoint.sh | 64 +++-- 1.9/scala_2.11-debian/docker-entrypoint.sh | 64 +++-- 1.9/scala_2.12-debian/docker-entrypoint.sh | 64 +++-- docker-entrypoint.sh| 64 +++-- 5 files changed, 120 insertions(+), 200 deletions(-) diff --git a/1.10/scala_2.11-debian/docker-entrypoint.sh b/1.10/scala_2.11-debian/docker-entrypoint.sh index 12e5d5a..011ccea 100755 --- a/1.10/scala_2.11-debian/docker-entrypoint.sh +++ b/1.10/scala_2.11-debian/docker-entrypoint.sh @@ -56,6 +56,27 @@ copy_plugins_if_required() { done } +set_config_option() { + local option=$1 + local value=$2 + + # escape periods for usage in regular expressions + local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g") + + # either override an existing entry, or append a new one + if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then +sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}" + else +echo "${option}: ${value}" >> "${CONF_FILE}" + fi +} + +set_common_options() { +set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} +set_config_option blob.server.port 6124 +set_config_option query.server.port 6125 +} + if [ "$1" = "help" ]; then echo "Usage: $(basename "$0") (jobmanager|taskmanager|help)" exit 0 @@ -64,23 +85,7 @@ elif [ "$1" = "jobmanager" ]; then echo "Starting Job Manager" copy_plugins_if_required -if grep -E "^jobmanager\.rpc\.address:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/jobmanager\.rpc\.address:.*/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "${CONF_FILE}" -else -echo "jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}" >> "${CONF_FILE}" -fi - -if grep -E "^blob\.server\.port:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/blob\.server\.port:.*/blob.server.port: 6124/g" "${CONF_FILE}" -else -echo "blob.server.port: 6124" >> "${CONF_FILE}" -fi - -if grep -E "^query\.server\.port:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/query\.server\.port:.*/query.server.port: 6125/g" "${CONF_FILE}" -else -echo "query.server.port: 6125" >> "${CONF_FILE}" -fi +set_common_options if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" @@ -95,29 +100,8 @@ elif [ "$1" = "taskmanager" ]; then TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)} -if grep -E "^jobmanager\.rpc\.address:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/jobmanager\.rpc\.address:.*/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "${CONF_FILE}" -else -echo "jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}" >> "${CONF_FILE}" -fi - -if grep -E "^taskmanager\.numberOfTaskSlots:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/taskmanager\.numberOfTaskSlots:.*/taskmanager.numberOfTaskSlots: ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}/g" "${CONF_FILE}" -else -echo "taskmanager.numberOfTaskSlots: ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" >> "${CONF_FILE}" -fi - -if grep -E "^blob\.server\.port:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/blob\.server\.port:.*/blob.server.port: 6124/g" "${CONF_FILE}" -else -echo "blob.server.port: 6124" >> "${CONF_FILE}" -fi - -if grep -E "^query\.server\.port:.*" "${CONF_FILE}" > /dev/null; then -sed -i -e "s/query\.server\.port:.*/query.server.port: 6125/g" "${CONF_FILE}" -else -echo "query.server.port: 6125" >> "${CONF_FILE}" -fi +set_common_options +set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" diff --git a/1.10/scala_2.12-debian/docker-entrypoint.sh b/1.10/scala_2.12-debian/docker-entrypoint.sh index 12e5d5a..011ccea 100755 --- a/1.10/scala_2.12-debian/docker-entrypoint.sh +++ b/1.10/scala_2.12-debian/docker-entrypoint.sh @@ -56,6 +56,27 @@ copy_plugins_if_required() { done } +set_config_option() { + local option=$1 + local value=$2 + + # escape periods for usage in
[flink-docker] branch master updated (9aa8079 -> e7e1903)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git. from 9aa8079 [FLINK-17287][github] Disable merge commit button new dbbbfba [FLINK-17365] Add GPG key for 1.9.3 release new e7e1903 [FLINK-17365] Update Dockerfiles for 1.9.3 release The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: 1.9/scala_2.11-debian/Dockerfile | 4 ++-- 1.9/scala_2.12-debian/Dockerfile | 4 ++-- add-version.sh | 2 ++ 3 files changed, 6 insertions(+), 4 deletions(-)
[flink-docker] 02/02: [FLINK-17365] Update Dockerfiles for 1.9.3 release
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit e7e19032d43fc8d93cb3763faf32f1bff9df891d Author: Chesnay Schepler AuthorDate: Fri Apr 24 09:32:43 2020 +0200 [FLINK-17365] Update Dockerfiles for 1.9.3 release --- 1.9/scala_2.11-debian/Dockerfile | 4 ++-- 1.9/scala_2.12-debian/Dockerfile | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/1.9/scala_2.11-debian/Dockerfile b/1.9/scala_2.11-debian/Dockerfile index d077523..49046f8 100644 --- a/1.9/scala_2.11-debian/Dockerfile +++ b/1.9/scala_2.11-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_VERSION=1.9.2 \ +ENV FLINK_VERSION=1.9.3 \ SCALA_VERSION=2.11 \ -GPG_KEY=EF88474C564C7A608A822EEC3FF96A2057B6476C +GPG_KEY=6B6291A8502BA8F0913AE04DDEB95B05BF075300 # Prepare environment ENV FLINK_HOME=/opt/flink diff --git a/1.9/scala_2.12-debian/Dockerfile b/1.9/scala_2.12-debian/Dockerfile index ed8ec7c..aad8384 100644 --- a/1.9/scala_2.12-debian/Dockerfile +++ b/1.9/scala_2.12-debian/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_VERSION=1.9.2 \ +ENV FLINK_VERSION=1.9.3 \ SCALA_VERSION=2.12 \ -GPG_KEY=EF88474C564C7A608A822EEC3FF96A2057B6476C +GPG_KEY=6B6291A8502BA8F0913AE04DDEB95B05BF075300 # Prepare environment ENV FLINK_HOME=/opt/flink
[flink-docker] 01/02: [FLINK-17365] Add GPG key for 1.9.3 release
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit dbbbfbaa129037ae37a102da85554436b083691b Author: Chesnay Schepler AuthorDate: Fri Apr 24 09:32:19 2020 +0200 [FLINK-17365] Add GPG key for 1.9.3 release --- add-version.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/add-version.sh b/add-version.sh index d2f5f66..d6db723 100755 --- a/add-version.sh +++ b/add-version.sh @@ -81,6 +81,8 @@ elif [ "$flink_version" = "1.9.1" ]; then gpg_key="E2C45417BED5C104154F341085BACB5AEFAE3202" elif [ "$flink_version" = "1.9.2" ]; then gpg_key="EF88474C564C7A608A822EEC3FF96A2057B6476C" +elif [ "$flink_version" = "1.9.3" ]; then +gpg_key="6B6291A8502BA8F0913AE04DDEB95B05BF075300" elif [ "$flink_version" = "1.10.0" ]; then gpg_key="BB137807CEFBE7DD2616556710B12A1F89C115E8" else
[flink-docker] branch master updated: [FLINK-17287][github] Disable merge commit button
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new 9aa8079 [FLINK-17287][github] Disable merge commit button 9aa8079 is described below commit 9aa8079fc95bb8d7fc704a880471035fd5b1af61 Author: Chesnay Schepler AuthorDate: Tue Apr 21 10:07:26 2020 +0200 [FLINK-17287][github] Disable merge commit button --- .asf.yaml | 4 1 file changed, 4 insertions(+) diff --git a/.asf.yaml b/.asf.yaml index ae3274d..867d33f 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -4,3 +4,7 @@ github: labels: - flink - docker + enabled_merge_buttons: +squash: true +merge: false +rebase: true
[flink] branch master updated: [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph generation
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ef4daeb [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph generation ef4daeb is described below commit ef4daeba7881cecc1548e387ab68d829f998dc67 Author: Zhu Zhu AuthorDate: Fri Apr 24 11:39:36 2020 +0800 [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph generation --- .../api/graph/GlobalDataExchangeMode.java | 48 +++ .../flink/streaming/api/graph/StreamGraph.java | 22 +-- .../streaming/api/graph/StreamGraphGenerator.java | 12 +- .../api/graph/StreamingJobGraphGenerator.java | 32 - .../api/graph/StreamingJobGraphGeneratorTest.java | 73 -- ...aphGeneratorWithGlobalDataExchangeModeTest.java | 153 + .../flink/table/planner/utils/ExecutorUtils.java | 3 +- 7 files changed, 241 insertions(+), 102 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java new file mode 100644 index 000..a74d9f3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; + +/** + * This mode decides the default {@link ResultPartitionType} of job edges. + * Note that this only affects job edges which are {@link ShuffleMode#UNDEFINED}. + */ +public enum GlobalDataExchangeMode { + /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */ + ALL_EDGES_BLOCKING, + + /** +* Set job edges with {@link ForwardPartitioner} to be {@link ResultPartitionType#PIPELINED_BOUNDED} +* and other edges to be {@link ResultPartitionType#BLOCKING}. +**/ + FORWARD_EDGES_PIPELINED, + + /** +* Set job edges with {@link ForwardPartitioner} or {@link RescalePartitioner} to be +* {@link ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link ResultPartitionType#BLOCKING}. +**/ + POINTWISE_EDGES_PIPELINED, + + /** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */ + ALL_EDGES_PIPELINED +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 39393fc..83d9aca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -96,11 +96,7 @@ public class StreamGraph implements Pipeline { private TimeCharacteristic timeCharacteristic; - /** -* If there are some stream edges that can not be chained and the shuffle mode of edge is not -* specified, translate these edges into {@code BLOCKING} result partition type. -*/ - private boolean blockingConnectionsBetweenChains; + private GlobalDataExchangeMode globalDataExchangeMode; /** Flag to indicate whether to put all vertices into the same slot sharing group by default. */ private boolean allVerticesInSameSlotSharingGroupByDefault = true; @@ -201,20 +197,12 @@ public class StreamGraph implements Pipeline { this.timeCharacteristic = timeCharacteristic; } - /** -* If there are some stream edges that can not be chained and the shuffle mode of edge is not -* specified, translate these edges into {@code BLOCKING} result partition