[flink-web] 01/02: Add Apache Flink release 1.9.3

2020-04-24 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit af2d8de132b73f0dae6fd88912fe1c92202f3efe
Author: Dian Fu 
AuthorDate: Fri Apr 17 20:29:14 2020 +0800

Add Apache Flink release 1.9.3
---
 _config.yml|  56 ---
 _posts/2020-04-24-release-1.9.3.md | 136 +
 2 files changed, 166 insertions(+), 26 deletions(-)

diff --git a/_config.yml b/_config.yml
index b28fa08..876e449 100644
--- a/_config.yml
+++ b/_config.yml
@@ -103,48 +103,48 @@ flink_releases:
   -
   version_short: 1.9
   binary_release:
-  name: "Apache Flink 1.9.2"
+  name: "Apache Flink 1.9.3"
   scala_211:
-  id: "192-download_211"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz;
-  asc_url: 
"https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz.asc;
-  sha512_url: 
"https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz.sha512;
+  id: "193-download_211"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.11.tgz;
+  asc_url: 
"https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.11.tgz.asc;
+  sha512_url: 
"https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.11.tgz.sha512;
   scala_212:
-  id: "192-download_212"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.12.tgz;
-  asc_url: 
"https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.12.tgz.asc;
-  sha512_url: 
"https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.12.tgz.sha512;
+  id: "193-download_212"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz;
+  asc_url: 
"https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz.asc;
+  sha512_url: 
"https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-bin-scala_2.12.tgz.sha512;
   source_release:
-  name: "Apache Flink 1.9.2"
-  id: "192-download-source"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.9.2/flink-1.9.2-src.tgz;
-  asc_url: 
"https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-src.tgz.asc;
-  sha512_url: 
"https://downloads.apache.org/flink/flink-1.9.2/flink-1.9.2-src.tgz.sha512;
+  name: "Apache Flink 1.9.3"
+  id: "193-download-source"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.9.3/flink-1.9.3-src.tgz;
+  asc_url: 
"https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-src.tgz.asc;
+  sha512_url: 
"https://downloads.apache.org/flink/flink-1.9.3/flink-1.9.3-src.tgz.sha512;
   optional_components:
 -
   name: "Avro SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 192-sql-format-avro
-  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.2/flink-avro-1.9.2.jar
-  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.2/flink-avro-1.9.2.jar.asc
-  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.2/flink-avro-1.9.2.jar.sha1
+  id: 193-sql-format-avro
+  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.3/flink-avro-1.9.3.jar
+  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.3/flink-avro-1.9.3.jar.asc
+  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.3/flink-avro-1.9.3.jar.sha1
 -
   name: "CSV SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 192-sql-format-csv
-  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.2/flink-csv-1.9.2.jar
-  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.2/flink-csv-1.9.2.jar.asc
-  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.2/flink-csv-1.9.2.jar.sha1
+  id: 193-sql-format-csv
+  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.3/flink-csv-1.9.3.jar
+  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.3/flink-csv-1.9.3.jar.asc
+  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.3/flink-csv-1.9.3.jar.sha1
 -
   name: "JSON SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 192-sql-format-json
-  url: 

[flink-web] branch asf-site updated (66d0403 -> 155914a)

2020-04-24 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from 66d0403  Update status for Hequn to PMC in community web page
 new af2d8de  Add Apache Flink release 1.9.3
 new 155914a  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _config.yml|  56 +-
 _posts/2020-04-24-release-1.9.3.md | 136 
 content/blog/feed.xml  | 779 -
 content/blog/index.html|  40 +-
 content/blog/page10/index.html |  48 +-
 content/blog/page11/index.html |  29 +
 content/blog/page2/index.html  |  40 +-
 content/blog/page3/index.html  |  40 +-
 content/blog/page4/index.html  |  40 +-
 content/blog/page5/index.html  |  40 +-
 content/blog/page6/index.html  |  38 +-
 content/blog/page7/index.html  |  36 +-
 content/blog/page8/index.html  |  36 +-
 content/blog/page9/index.html  |  42 +-
 content/downloads.html |  29 +-
 content/index.html |   8 +-
 .../04/24/release-1.9.3.html}  | 106 +--
 content/zh/downloads.html  |  33 +-
 content/zh/index.html  |   8 +-
 19 files changed, 689 insertions(+), 895 deletions(-)
 create mode 100644 _posts/2020-04-24-release-1.9.3.md
 copy content/news/{2018/08/21/release-1.5.3.html => 
2020/04/24/release-1.9.3.html} (64%)



[flink] branch master updated: [FLINK-17333][doc] add doc for 'create catalog' ddl

2020-04-24 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f6f2fb5  [FLINK-17333][doc] add doc for 'create catalog' ddl
f6f2fb5 is described below

commit f6f2fb56c3ee1d801f0db62fed357f918d40d4d0
Author: bowen.li 
AuthorDate: Wed Apr 22 22:05:54 2020 -0700

[FLINK-17333][doc] add doc for 'create catalog' ddl

closes #11871
---
 docs/dev/table/sql/create.md| 18 ++
 docs/dev/table/sql/create.zh.md | 18 ++
 2 files changed, 36 insertions(+)

diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md
index 6e41eac..e44e2bc 100644
--- a/docs/dev/table/sql/create.md
+++ b/docs/dev/table/sql/create.md
@@ -210,6 +210,24 @@ The key and value of expression `key1=val1` should both be 
string literal. See d
 
 {% top %}
 
+## CREATE CATALOG
+
+{% highlight sql %}
+CREATE CATALOG catalog_name
+  WITH (key1=val1, key2=val2, ...)
+{% endhighlight %}
+
+Create a catalog with the given catalog properties. If a catalog with the same 
name already exists, an exception is thrown.
+
+**WITH OPTIONS**
+
+Catalog properties used to store extra information related to this catalog.
+The key and value of expression `key1=val1` should both be string literal.
+
+Check out more details at [Catalogs]({{ site.baseurl 
}}/dev/table/catalogs.html).
+
+{% top %}
+
 ## CREATE DATABASE
 
 {% highlight sql %}
diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md
index a696912..5c6099a 100644
--- a/docs/dev/table/sql/create.zh.md
+++ b/docs/dev/table/sql/create.zh.md
@@ -210,6 +210,24 @@ CREATE TABLE Orders (
 
 {% top %}
 
+## CREATE CATALOG
+
+{% highlight sql %}
+CREATE CATALOG catalog_name
+  WITH (key1=val1, key2=val2, ...)
+{% endhighlight %}
+
+Create a catalog with the given catalog properties. If a catalog with the same 
name already exists, an exception is thrown.
+
+**WITH OPTIONS**
+
+Catalog properties used to store extra information related to this catalog.
+The key and value of expression `key1=val1` should both be string literal.
+
+Check out more details at [Catalogs]({{ site.baseurl 
}}/dev/table/catalogs.html).
+
+{% top %}
+
 ## CREATE DATABASE
 
 {% highlight sql %}



[flink] branch master updated: [FLINK-16812][jdbc] support array types in PostgresRowConverter

2020-04-24 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4121292  [FLINK-16812][jdbc] support array types in 
PostgresRowConverter
4121292 is described below

commit 4121292fbb63dacef29245a2234da68fa499efa6
Author: bowen.li 
AuthorDate: Wed Apr 15 22:26:22 2020 -0700

[FLINK-16812][jdbc] support array types in PostgresRowConverter

closes #11766
---
 .../row/converter/AbstractJDBCRowConverter.java|  5 +-
 .../source/row/converter/JDBCRowConverter.java |  2 +-
 .../source/row/converter/PostgresRowConverter.java | 43 +++
 .../io/jdbc/catalog/PostgresCatalogITCase.java | 26 +
 .../java/io/jdbc/catalog/PostgresCatalogTest.java  |  9 +++-
 .../io/jdbc/catalog/PostgresCatalogTestBase.java   | 62 ++
 6 files changed, 144 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java
index 06a6329..abe753f 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java
@@ -54,7 +54,10 @@ public abstract class AbstractJDBCRowConverter implements 
JDBCRowConverter {
return reuse;
}
 
-   private JDBCFieldConverter createConverter(LogicalType type) {
+   /**
+* Create a runtime JDBC field converter from given {@link LogicalType}.
+*/
+   public JDBCFieldConverter createConverter(LogicalType type) {
LogicalTypeRoot root = type.getTypeRoot();
 
if (root == LogicalTypeRoot.SMALLINT) {
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java
index 3f997b4..e89fbc3 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java
@@ -43,6 +43,6 @@ public interface JDBCRowConverter extends Serializable {
 */
@FunctionalInterface
interface JDBCFieldConverter extends Serializable {
-   Object convert(Object value);
+   Object convert(Object value) throws SQLException;
}
 }
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java
index 102f079..82b6be8 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java
@@ -18,7 +18,15 @@
 
 package org.apache.flink.api.java.io.jdbc.source.row.converter;
 
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import org.postgresql.jdbc.PgArray;
+import org.postgresql.util.PGobject;
 
 /**
  * Row converter for Postgres.
@@ -28,4 +36,39 @@ public class PostgresRowConverter extends 
AbstractJDBCRowConverter {
public PostgresRowConverter(RowType rowType) {
super(rowType);
}
+
+   @Override
+   public JDBCFieldConverter createConverter(LogicalType type) {
+   LogicalTypeRoot root = type.getTypeRoot();
+
+   if (root == LogicalTypeRoot.ARRAY) {
+   ArrayType arrayType = (ArrayType) type;
+
+   // PG's bytea[] is wrapped in PGobject, rather than 
primitive byte arrays
+   if 
(LogicalTypeChecks.hasFamily(arrayType.getElementType(), 
LogicalTypeFamily.BINARY_STRING)) {
+
+   return v -> {
+   PgArray pgArray = (PgArray) v;
+   Object[] in = (Object[]) 
pgArray.getArray();
+
+   Object[] out = new 

[flink] branch master updated (c4b44e9 -> 6128bd1)

2020-04-24 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c4b44e9  [FLINK-13639] Refactor the IntermediateResultPartitionID to 
consist of IntermediateDataSetID and partitionIndex
 add 6128bd1  [FLINK-17175][core] StringUtils.arrayToString() should 
consider Object[] lastly

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/flink/util/StringUtils.java  |  6 +++---
 .../src/test/java/org/apache/flink/util/StringUtilsTest.java  | 11 +++
 2 files changed, 14 insertions(+), 3 deletions(-)



[flink] annotated tag release-1.10.1-rc1 updated (84b74cc -> ee391fd)

2020-04-24 Thread liyu
This is an automated email from the ASF dual-hosted git repository.

liyu pushed a change to annotated tag release-1.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.10.1-rc1 was modified! ***

from 84b74cc  (commit)
  to ee391fd  (tag)
 tagging 84b74cc0e21981bf6feceb74b48d7a9d3e215dc5 (commit)
 replaces pre-apache-rename
  by Yu Li
  on Fri Apr 24 15:57:51 2020 +0800

- Log -
release-1.10.1-rc1
-BEGIN PGP SIGNATURE-
Version: GnuPG v2.0.22 (GNU/Linux)

iQIcBAABAgAGBQJeopv/AAoJEJPAeQJ3GrdDacgP/0GlJzrzRLIf3n4oVTr7ZpCl
m769HJCepjWoo5IlRl7PI5f7xdtRsB+145kOOvMAoru4X7GZzCLLoY6ZxDEIEqv6
xt/1SW78LxqqnfZdHBoMLM4IFzW1JWHT4q2sUjM2GfDly2UPbgQKtOul7bCzzytN
bEo9KwOGcsKmsT3RyVZ4ybJ5TGZ+aQkTSjPbwKbzUXhj62OlCCeHDvpovo6z70zW
EN3ZSNe/cRqDBpFOSQ72cCL1frNOt6g0Y4ip4ivEZSN9LG1HWCa5p9c0MsU1wUzW
hYLm+hjFT0AzMhiD1Yxe6oHzE/RlNs7WedCWwHs6SHPamMwRfhnLKnj2Ua0NJXmK
rCWYpPgfp0qfm7ti5hmUa75UxGzCbCnV0CD8UYsXVP5LBAK40khhP3QM2aQeBSzR
JHGykcNfKjqmuloZV06QjfljrRsARpMm0aEb7WzLMQdxpjp1EYUuFBKGaF0QAiZx
fAF0JYvsEjKu6r1t4sDlDUd6iOZsfIYAkrTRB3qkkg09Bk7+RuwjRM+53jTF1TwY
lJmhR6PPrhGmdFBrMRp+0PLNXUofClqFcqYI/6zbfoI5zcUd5Ix0cIVRMa2cqJ7D
IpD+VxxeqyuTgFWD/AW3WcTcCNRDJkXMQcR6Am2fODfQxTgUs0L7+ftS7lGPjMYb
cP8nDLVJWZQAtnHmpC9j
=N9Qu
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r39106 - /dev/flink/flink-1.10.1-rc1/

2020-04-24 Thread liyu
Author: liyu
Date: Fri Apr 24 16:32:13 2020
New Revision: 39106

Log:
Add flink-1.10.1-rc1

Added:
dev/flink/flink-1.10.1-rc1/
dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz   (with props)
dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.asc   (with props)
dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512
dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz   (with props)
dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.asc   (with 
props)
dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512
dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz   (with props)
dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.asc   (with 
props)
dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512
dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz   (with props)
dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz.asc   (with props)
dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz.sha512

Added: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz
--
svn:mime-type = application/x-gzip

Added: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.asc
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.asc
--
svn:mime-type = application/pgp-signature

Added: dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512
==
--- dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512 (added)
+++ dev/flink/flink-1.10.1-rc1/apache-flink-1.10.1.tar.gz.sha512 Fri Apr 24 
16:32:13 2020
@@ -0,0 +1 @@
+2aa83b6cf9888bae6215e376379ab4bd3f9370b708e1bb4b6e526f4099116777b66c8aaad77986a02560999dbe93d0f3079c5903b365322fba085eb76e4afb98
  apache-flink-1.10.1.tar.gz

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz
--
svn:mime-type = application/x-gzip

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.asc
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.asc
--
svn:mime-type = application/pgp-signature

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512
==
--- dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512 (added)
+++ dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.11.tgz.sha512 Fri Apr 
24 16:32:13 2020
@@ -0,0 +1 @@
+6fcbc3a9e3aa86e21c6a63ac66bb2522fb826e305ff72b77172fde70d7c5a000a2a94fe0cdd3d8b1d7fb2d2367eec0a8105cbfddc86c48aceabbfeece151c92a
  flink-1.10.1-bin-scala_2.11.tgz

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz
--
svn:mime-type = application/x-gzip

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.asc
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.asc
--
svn:mime-type = application/pgp-signature

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512
==
--- dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512 (added)
+++ dev/flink/flink-1.10.1-rc1/flink-1.10.1-bin-scala_2.12.tgz.sha512 Fri Apr 
24 16:32:13 2020
@@ -0,0 +1 @@
+b71a0acff9c228c49df3fff0e011b8ee59e7f54e3436b43e0f0db7582676c2e6bc5dd11a8860e4b02125d4e8cd11fee64120022e1a961315ebd910f66d7543b1
  flink-1.10.1-bin-scala_2.12.tgz

Added: dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.10.1-rc1/flink-1.10.1-src.tgz

[flink] branch master updated: [FLINK-13639] Refactor the IntermediateResultPartitionID to consist of IntermediateDataSetID and partitionIndex

2020-04-24 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c4b44e9  [FLINK-13639] Refactor the IntermediateResultPartitionID to 
consist of IntermediateDataSetID and partitionIndex
c4b44e9 is described below

commit c4b44e9d4a7af9e841f785fcf0e4831f4cd5afe9
Author: Yangze Guo 
AuthorDate: Wed Apr 22 14:52:21 2020 +0800

[FLINK-13639] Refactor the IntermediateResultPartitionID to consist of 
IntermediateDataSetID and partitionIndex
---
 .../IntermediateResultPartition.java   |  2 +-
 .../runtime/io/network/netty/NettyMessage.java |  4 +-
 .../runtime/jobgraph/IntermediateDataSetID.java| 17 +++
 .../jobgraph/IntermediateResultPartitionID.java| 54 +-
 .../apache/flink/runtime/topology/ResultID.java|  2 +-
 5 files changed, 64 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 5cdd376b..78bd9c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -46,7 +46,7 @@ public class IntermediateResultPartition {
this.producer = producer;
this.partitionNumber = partitionNumber;
this.consumers = new ArrayList>(0);
-   this.partitionId = new IntermediateResultPartitionID();
+   this.partitionId = new 
IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
}
 
public ExecutionVertex getProducer() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2ed875d..5523cf2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -508,7 +508,7 @@ public abstract class NettyMessage {
ByteBuf result = null;
 
try {
-   result = allocateBuffer(allocator, ID, 16 + 16 
+ 4 + 16 + 4);
+   result = allocateBuffer(allocator, ID, 20 + 16 
+ 4 + 16 + 4);
 
partitionId.getPartitionId().writeTo(result);
partitionId.getProducerId().writeTo(result);
@@ -569,7 +569,7 @@ public abstract class NettyMessage {
// TODO Directly serialize to Netty's buffer
ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(event);
 
-   result = allocateBuffer(allocator, ID, 4 + 
serializedEvent.remaining() + 16 + 16 + 16);
+   result = allocateBuffer(allocator, ID, 4 + 
serializedEvent.remaining() + 20 + 16 + 16);
 
result.writeInt(serializedEvent.remaining());
result.writeBytes(serializedEvent);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
index c22ca1e..b34e3f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.jobgraph;
 import org.apache.flink.runtime.topology.ResultID;
 import org.apache.flink.util.AbstractID;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
 import java.util.UUID;
 
 /**
@@ -54,4 +56,19 @@ public class IntermediateDataSetID extends AbstractID 
implements ResultID {
public IntermediateDataSetID(UUID from) {
super(from.getLeastSignificantBits(), 
from.getMostSignificantBits());
}
+
+   private IntermediateDataSetID(long lower, long upper) {
+   super(lower, upper);
+   }
+
+   public void writeTo(ByteBuf buf) {
+   buf.writeLong(lowerPart);
+   buf.writeLong(upperPart);
+   }
+
+   public static IntermediateDataSetID fromByteBuf(ByteBuf buf) {
+   final long lower = buf.readLong();
+   final long upper = buf.readLong();
+   return new IntermediateDataSetID(lower, upper);
+   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionID.java
 

[flink] branch master updated (90fed72 -> deb1268)

2020-04-24 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 90fed72  [FLINK-17345][python][table] Support register and get Python 
UDF in Catalog (#11884)
 add deb1268  [FLINK-17021][table-planner-blink] Blink batch planner set 
GlobalDataExchangeMode

No new revisions were added by this update.

Summary of changes:
 .../generated/execution_config_configuration.html  |  6 +-
 .../apache/flink/table/tpcds/TpcdsTestProgram.java |  3 -
 .../table/api/config/ExecutionConfigOptions.java   | 39 --
 .../flink/table/planner/utils/ExecutorUtils.java   | 23 ++
 .../table/planner/utils/ShuffleModeUtils.java  | 55 ++
 .../nodes/physical/batch/BatchExecExchange.scala   |  4 +-
 .../table/planner/utils/ShuffleModeUtilsTest.java  | 86 ++
 .../planner/runtime/utils/BatchTestBase.scala  |  6 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  6 +-
 9 files changed, 197 insertions(+), 31 deletions(-)
 create mode 100644 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java
 create mode 100644 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java



[flink] branch master updated (efd22a7 -> 90fed72)

2020-04-24 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from efd22a7  [hotfix][filesystems] Remove unused StopWatch
 add 90fed72  [FLINK-17345][python][table] Support register and get Python 
UDF in Catalog (#11884)

No new revisions were added by this update.

Summary of changes:
 .../flink/table/catalog/hive/HiveCatalog.java  | 24 +++-
 .../catalog/hive/HiveCatalogMetadataTestBase.java  |  6 
 .../client/python/PythonFunctionFactoryTest.java   | 16 +++
 .../table/api/internal/TableEnvironmentImpl.java   |  4 ---
 .../flink/table/catalog/CatalogFunctionImpl.java   |  3 ++
 .../flink/table/catalog/FunctionCatalog.java   |  6 ++--
 .../table/catalog/GenericInMemoryCatalogTest.java  |  5 
 .../apache/flink/table/catalog/CatalogTest.java| 32 ++
 .../operations/SqlToOperationConverter.java|  6 
 .../table/sqlexec/SqlToOperationConverter.java |  6 
 .../flink/table/api/internal/TableEnvImpl.scala|  2 --
 11 files changed, 82 insertions(+), 28 deletions(-)



[flink] 05/05: [hotfix][filesystems] Remove unused StopWatch

2020-04-24 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efd22a7cfc53d083b6c02941641d8f17f1c767ea
Author: Gary Yao 
AuthorDate: Thu Apr 23 19:00:49 2020 +0200

[hotfix][filesystems] Remove unused StopWatch
---
 .../flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java| 4 
 1 file changed, 4 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index 05e8cd0..adf123f 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -332,9 +331,6 @@ class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream
 
final Deadline deadline = 
Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
 
-   final StopWatch sw = new StopWatch();
-   sw.start();
-
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
try {



[flink] 01/05: [hotfix][runtime] Throw exception from TestingSchedulingPipelinedRegion#getVertex() for unknown vertices

2020-04-24 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd13c2c79002e436df3c430e79d010f13b567e2d
Author: Gary Yao 
AuthorDate: Tue Apr 21 15:40:11 2020 +0200

[hotfix][runtime] Throw exception from 
TestingSchedulingPipelinedRegion#getVertex() for unknown vertices
---
 .../scheduler/strategy/TestingSchedulingPipelinedRegion.java| 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
index 34450a7..2166fdc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingPipelinedRegion.java
@@ -52,7 +52,11 @@ public class TestingSchedulingPipelinedRegion implements 
SchedulingPipelinedRegi
 
@Override
public TestingSchedulingExecutionVertex getVertex(ExecutionVertexID 
vertexId) {
-   return regionVertices.get(vertexId);
+   final TestingSchedulingExecutionVertex executionVertex = 
regionVertices.get(vertexId);
+   if (executionVertex == null) {
+   throw new 
IllegalArgumentException(String.format("Execution vertex %s not found in 
pipelined region", vertexId));
+   }
+   return executionVertex;
}
 
@Override



[flink] 03/05: [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RegionPartitionReleaseStrategy

2020-04-24 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95b3c955f115dacb58b9695ae4192f729f5d5662
Author: Gary Yao 
AuthorDate: Tue Apr 21 15:23:40 2020 +0200

[FLINK-17180][runtime] Use SchedulingPipelinedRegion in 
RegionPartitionReleaseStrategy

Avoid re-computing pipelined regions in the
RegionPartitionReleaseStrategy using the PipelinedRegionComputeUtil.
Instead, rely on the pipelined regions provided by the Topology.
---
 .../failover/flip1/PipelinedRegionComputeUtil.java | 20 -
 .../flip1/partitionrelease/PipelinedRegion.java| 69 ---
 .../PipelinedRegionConsumedBlockingPartitions.java | 51 
 .../PipelinedRegionExecutionView.java  | 24 +++---
 .../RegionPartitionReleaseStrategy.java| 97 --
 .../RegionPartitionReleaseStrategyTest.java| 38 +
 .../PipelinedRegionExecutionViewTest.java  | 21 +++--
 7 files changed, 45 insertions(+), 275 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index aa94841..52d96f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.topology.BaseTopology;
 import org.apache.flink.runtime.topology.Result;
 import org.apache.flink.runtime.topology.Vertex;
@@ -34,8 +31,6 @@ import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * Utility for computing pipelined regions.
@@ -44,21 +39,6 @@ public final class PipelinedRegionComputeUtil {
 
private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
 
-   public static Set toPipelinedRegionsSet(
-   final Set> distinctRegions) {
-
-   return distinctRegions.stream()
-   .map(toExecutionVertexIdSet())
-   .map(PipelinedRegion::from)
-   .collect(Collectors.toSet());
-   }
-
-   private static Function, 
Set> toExecutionVertexIdSet() {
-   return failoverVertices -> failoverVertices.stream()
-   .map(SchedulingExecutionVertex::getId)
-   .collect(Collectors.toSet());
-   }
-
public static , R extends Result> Set> computePipelinedRegions(
final BaseTopology topology) {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
deleted file mode 100644
index 36c042e..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;
-
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Set of execution vertices that are connected through pipelined intermediate 
result partitions.
- */

[flink] 02/05: [FLINK-17180][runtime] Implement SchedulingPipelinedRegion interface

2020-04-24 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23c13bbdbfa1b538a6c9e4e9622ef4563f69cd03
Author: Gary Yao 
AuthorDate: Mon Apr 20 19:17:07 2020 +0200

[FLINK-17180][runtime] Implement SchedulingPipelinedRegion interface

Implement interfaces

  - Toplogy#getAllPipelinedRegions()
  - Topology#getPipelinedRegionOfVertex(ExecutionVertexID)

in DefaultExecutionTopology to enable retrieval of pipelined regions.
---
 .../adapter/DefaultExecutionTopology.java  |  49 
 .../adapter/DefaultSchedulingPipelinedRegion.java  |  85 ++
 .../adapter/DefaultExecutionTopologyTest.java  |  21 
 .../DefaultSchedulingPipelinedRegionTest.java  | 127 +
 4 files changed, 282 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
index 63bd39f..d7eb54e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
@@ -23,16 +23,23 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,6 +48,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class DefaultExecutionTopology implements SchedulingTopology {
 
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutionTopology.class);
+
private final boolean containsCoLocationConstraints;
 
private final Map 
executionVerticesById;
@@ -49,6 +58,10 @@ public class DefaultExecutionTopology implements 
SchedulingTopology {
 
private final Map resultPartitionsById;
 
+   private final Map 
pipelinedRegionsByVertex;
+
+   private final List pipelinedRegions;
+
public DefaultExecutionTopology(ExecutionGraph graph) {
checkNotNull(graph, "execution graph can not be null");
 
@@ -74,6 +87,28 @@ public class DefaultExecutionTopology implements 
SchedulingTopology {
this.resultPartitionsById = tmpResultPartitionsById;
 
connectVerticesToConsumedPartitions(executionVertexMap, 
tmpResultPartitionsById);
+
+   this.pipelinedRegionsByVertex = new HashMap<>();
+   this.pipelinedRegions = new ArrayList<>();
+   initializePipelinedRegions();
+   }
+
+   private void initializePipelinedRegions() {
+   final long buildRegionsStartTime = System.nanoTime();
+
+   final Set> rawPipelinedRegions = 
PipelinedRegionComputeUtil.computePipelinedRegions(this);
+   for (Set 
rawPipelinedRegion : rawPipelinedRegions) {
+   //noinspection unchecked
+   final DefaultSchedulingPipelinedRegion pipelinedRegion 
= new DefaultSchedulingPipelinedRegion((Set) 
rawPipelinedRegion);
+   pipelinedRegions.add(pipelinedRegion);
+
+   for (SchedulingExecutionVertex executionVertex : 
rawPipelinedRegion) {
+   
pipelinedRegionsByVertex.put(executionVertex.getId(), pipelinedRegion);
+   }
+   }
+
+   final long buildRegionsDuration = (System.nanoTime() - 
buildRegionsStartTime) / 1_000_000;
+   LOG.info("Built {} pipelined regions in {} ms", 
pipelinedRegions.size(), buildRegionsDuration);
}
 
@Override
@@ -104,6 +139,20 @@ public class DefaultExecutionTopology implements 
SchedulingTopology {
return resultPartition;
}
 
+   @Override
+   public Iterable 
getAllPipelinedRegions() {
+   return Collections.unmodifiableCollection(pipelinedRegions);
+   }
+
+  

[flink] branch master updated (8514200 -> efd22a7)

2020-04-24 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 8514200  [FLINK-17308] Add regular cleanup task for ExecutionGraphCache
 new fd13c2c  [hotfix][runtime] Throw exception from 
TestingSchedulingPipelinedRegion#getVertex() for unknown vertices
 new 23c13bb  [FLINK-17180][runtime] Implement SchedulingPipelinedRegion 
interface
 new 95b3c95  [FLINK-17180][runtime] Use SchedulingPipelinedRegion in 
RegionPartitionReleaseStrategy
 new f9c23a0  [FLINK-17180][runtime] Use SchedulingPipelinedRegion in 
RestartPipelinedRegionFailoverStrategy
 new efd22a7  [hotfix][filesystems] Remove unused StopWatch

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |   4 -
 .../failover/flip1/FailoverRegion.java |  68 ---
 .../failover/flip1/PipelinedRegionComputeUtil.java |  20 
 .../RestartPipelinedRegionFailoverStrategy.java|  63 +++---
 .../flip1/partitionrelease/PipelinedRegion.java|  69 ---
 .../PipelinedRegionConsumedBlockingPartitions.java |  51 -
 .../PipelinedRegionExecutionView.java  |  24 ++--
 .../RegionPartitionReleaseStrategy.java|  97 +++-
 .../adapter/DefaultExecutionTopology.java  |  49 
 .../adapter/DefaultSchedulingPipelinedRegion.java  |  85 ++
 .../RegionPartitionReleaseStrategyTest.java|  38 +-
 ...ipelinedRegionFailoverStrategyBuildingTest.java | 125 ++--
 .../PipelinedRegionExecutionViewTest.java  |  21 ++--
 .../adapter/DefaultExecutionTopologyTest.java  |  21 
 .../DefaultSchedulingPipelinedRegionTest.java  | 127 +
 .../strategy/TestingSchedulingPipelinedRegion.java |   6 +-
 16 files changed, 410 insertions(+), 458 deletions(-)
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegion.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java



[flink] 04/05: [FLINK-17180][runtime] Use SchedulingPipelinedRegion in RestartPipelinedRegionFailoverStrategy

2020-04-24 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9c23a0b86121d6361df403a05f75ba4b3902735
Author: Gary Yao 
AuthorDate: Tue Apr 21 15:59:27 2020 +0200

[FLINK-17180][runtime] Use SchedulingPipelinedRegion in 
RestartPipelinedRegionFailoverStrategy

Avoid re-computing pipelined regions in the
RestartPipelinedRegionFailoverStrategy using the
PipelinedRegionComputeUtil. Instead, rely on the pipelined regions
provided by the Topology.

This closes #11857.
---
 .../failover/flip1/FailoverRegion.java |  68 ---
 .../RestartPipelinedRegionFailoverStrategy.java|  63 +++
 ...ipelinedRegionFailoverStrategyBuildingTest.java | 125 +++--
 3 files changed, 78 insertions(+), 178 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
deleted file mode 100644
index d1efb6f..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.failover.flip1;
-
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * FailoverRegion is a subset of all the vertices in the job topology.
- */
-public class FailoverRegion {
-
-   /** All vertex IDs in this region. */
-   private final Set executionVertexIDs;
-
-   /** All vertices in this region. */
-   private final Set 
executionVertices;
-
-   /**
-* Creates a new failover region containing a set of vertices.
-*
-* @param executionVertices to be contained in this region
-*/
-   public FailoverRegion(Set 
executionVertices) {
-   this.executionVertices = checkNotNull(executionVertices);
-   this.executionVertexIDs = new HashSet<>();
-   executionVertices.forEach(v -> 
this.executionVertexIDs.add(v.getId()));
-   }
-
-   /**
-* Returns IDs of all vertices in this region.
-*
-* @return IDs of all vertices in this region
-*/
-   public Set getAllExecutionVertexIDs() {
-   return executionVertexIDs;
-   }
-
-   /**
-* Returns all vertices in this region.
-*
-* @return all vertices in this region
-*/
-   public Set 
getAllExecutionVertices() {
-   return executionVertices;
-   }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index 3c158f0..eb8d06b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionException;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.util.ExceptionUtils;
@@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.Collections;
-import 

[flink] branch release-1.9 updated (8c8fa36 -> 244ca66)

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 8c8fa36  [FLINK-17215][python][docs] Clean the build document for 
Pyflink (#11792)
 add 513d7b7  [hotfix] Remove unused docs.rest.NoOpTransientBlobService
 add 509da3a  [hotfix] Make NoOpTransientBlobService a public testing class
 add aa6966b  [FLINK-17308] Pass ScheduledExecutorService to 
WebMonitorEndpoint
 add eb75338  [FLINK-17308] Pass in ExecutionGraphCache to 
WebMonitorEndpoint
 add d90ec14  [FLINK-17308] Extract ExecutionGraphCache interface and 
rename impl into DefaultExecutionGraphCache
 add 244ca66  [FLINK-17308] Add regular cleanup task for ExecutionGraphCache

No new revisions were added by this update.

Summary of changes:
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   7 +-
 ...tDispatcherResourceManagerComponentFactory.java |   4 +-
 .../jobmaster/MiniDispatcherRestEndpoint.java  |   7 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   5 +-
 .../flink/runtime/rest/RestEndpointFactory.java|  14 ++-
 .../runtime/rest/SessionRestEndpointFactory.java   |   5 +-
 ...hCache.java => DefaultExecutionGraphCache.java} |  27 +
 .../rest/handler/legacy/ExecutionGraphCache.java   | 128 ++---
 .../runtime/webmonitor/WebMonitorEndpoint.java |  35 --
 .../runtime/blob}/NoOpTransientBlobService.java|  10 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   4 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   4 +-
 ...st.java => DefaultExecutionGraphCacheTest.java} |  18 +--
 .../util/DocumentingDispatcherRestEndpoint.java|  58 +-
 .../rest/util/NoOpExecutionGraphCache.java}|  28 +++--
 .../webmonitor/TestingExecutionGraphCache.java | 113 ++
 .../runtime/webmonitor/WebMonitorEndpointTest.java |  79 +
 18 files changed, 307 insertions(+), 243 deletions(-)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCache.java
 => DefaultExecutionGraphCache.java} (83%)
 rename {flink-docs/src/main/java/org/apache/flink/docs/rest => 
flink-runtime/src/test/java/org/apache/flink/runtime/blob}/NoOpTransientBlobService.java
 (84%)
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCacheTest.java
 => DefaultExecutionGraphCacheTest.java} (93%)
 copy 
flink-runtime/src/{main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
 => test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java} 
(52%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java



[flink] 02/02: [FLINK-17308] Add regular cleanup task for ExecutionGraphCache

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74c29879810c64100534e7892b5edc6e03bdc2d9
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 10:53:27 2020 +0200

[FLINK-17308] Add regular cleanup task for ExecutionGraphCache

The WebMonitorEndpoint now schedules are regular cleanup task which
runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up
expired ExecutionGraphCache entries. This ensures that we will remove
unused entries.

This closes #11882.
---
 .../runtime/webmonitor/WebMonitorEndpoint.java |  20 
 .../webmonitor/TestingExecutionGraphCache.java | 113 +
 .../runtime/webmonitor/WebMonitorEndpointTest.java |  79 ++
 3 files changed, 212 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 3d9e94e..ad98cc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -127,6 +127,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -138,6 +139,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -167,6 +169,9 @@ public class WebMonitorEndpoint 
extends RestServerEndp
 
private final Collection archivingHandlers = new 
ArrayList<>(16);
 
+   @Nullable
+   private ScheduledFuture executionGraphCleanupTask;
+
public WebMonitorEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever leaderRetriever,
@@ -682,13 +687,28 @@ public class WebMonitorEndpoint 
extends RestServerEndp
@Override
public void startInternal() throws Exception {
leaderElectionService.start(this);
+   startExecutionGraphCacheCleanupTask();
+
if (hasWebUI) {
log.info("Web frontend listening at {}.", 
getRestBaseUrl());
}
}
 
+   private void startExecutionGraphCacheCleanupTask() {
+   final long cleanupInterval = 2 * 
restConfiguration.getRefreshInterval();
+   executionGraphCleanupTask = executor.scheduleWithFixedDelay(
+   executionGraphCache::cleanup,
+   cleanupInterval,
+   cleanupInterval,
+   TimeUnit.MILLISECONDS);
+   }
+
@Override
protected CompletableFuture shutDownInternal() {
+   if (executionGraphCleanupTask != null) {
+   executionGraphCleanupTask.cancel(false);
+   }
+
executionGraphCache.close();
 
final CompletableFuture shutdownFuture = 
FutureUtils.runAfterwards(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
new file mode 100644
index 000..a61bdc3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+
+import java.util.concurrent.CompletableFuture;
+import 

[flink] branch release-1.10 updated (0e2b520 -> 74c2987)

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0e2b520  [FLINK-17338] Increase timeouts on LocalExecutorITCase
 add 0157501  [hotfix] Remove unused docs.rest.NoOpTransientBlobService
 add 85a9e7e  [hotfix] Make NoOpTransientBlobService a public testing class
 add 138b350  [FLINK-17308] Pass ScheduledExecutorService to 
WebMonitorEndpoint
 add c4ec62f  [FLINK-17308] Pass in ExecutionGraphCache to 
WebMonitorEndpoint
 new ba6b23b  [FLINK-17308] Extract ExecutionGraphCache interface and 
rename impl into DefaultExecutionGraphCache
 new 74c2987  [FLINK-17308] Add regular cleanup task for ExecutionGraphCache

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   7 +-
 ...tDispatcherResourceManagerComponentFactory.java |   4 +-
 .../jobmaster/MiniDispatcherRestEndpoint.java  |   7 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   5 +-
 .../flink/runtime/rest/RestEndpointFactory.java|  14 ++-
 .../runtime/rest/SessionRestEndpointFactory.java   |   5 +-
 ...hCache.java => DefaultExecutionGraphCache.java} |  27 +
 .../rest/handler/legacy/ExecutionGraphCache.java   | 128 ++---
 .../runtime/webmonitor/WebMonitorEndpoint.java |  35 --
 .../runtime/blob}/NoOpTransientBlobService.java|  10 +-
 .../rest/handler/job/JobConfigHandlerTest.java |   4 +-
 .../rest/handler/job/JobExceptionsHandlerTest.java |   4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   4 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   4 +-
 .../metrics/JobVertexWatermarksHandlerTest.java|   4 +-
 ...st.java => DefaultExecutionGraphCacheTest.java} |  18 +--
 .../util/DocumentingDispatcherRestEndpoint.java|  58 +-
 .../util/NoOpExecutionGraphCache.java} |  27 +++--
 .../webmonitor/TestingExecutionGraphCache.java | 113 ++
 .../runtime/webmonitor/WebMonitorEndpointTest.java |  79 +
 21 files changed, 308 insertions(+), 253 deletions(-)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCache.java
 => DefaultExecutionGraphCache.java} (83%)
 rename {flink-docs/src/main/java/org/apache/flink/docs/rest => 
flink-runtime/src/test/java/org/apache/flink/runtime/blob}/NoOpTransientBlobService.java
 (84%)
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCacheTest.java
 => DefaultExecutionGraphCacheTest.java} (93%)
 copy 
flink-runtime/src/test/java/org/apache/flink/runtime/{dispatcher/NoOpJobGraphWriter.java
 => rest/util/NoOpExecutionGraphCache.java} (52%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java



[flink] 01/02: [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba6b23bc13cbe391a04089856cc6a64f4555f4f9
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 10:38:15 2020 +0200

[FLINK-17308] Extract ExecutionGraphCache interface and rename impl into 
DefaultExecutionGraphCache

This commit extracts the ExecutionGraphCache interface from the 
implementation and renames
the latter into DefaultExecutionGraphCache. Moreover, it introduces the 
NoOpExecutionGraphCache
implementation which is used for the DocumentingDispatcherRestEndpoint.
---
 .../flink/runtime/rest/RestEndpointFactory.java|   3 +-
 ...hCache.java => DefaultExecutionGraphCache.java} |  27 +
 .../rest/handler/legacy/ExecutionGraphCache.java   | 128 ++---
 .../rest/handler/job/JobConfigHandlerTest.java |   4 +-
 .../rest/handler/job/JobExceptionsHandlerTest.java |   4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   4 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   4 +-
 .../metrics/JobVertexWatermarksHandlerTest.java|   4 +-
 ...st.java => DefaultExecutionGraphCacheTest.java} |  18 +--
 .../util/DocumentingDispatcherRestEndpoint.java|   3 +-
 .../runtime/rest/util/NoOpExecutionGraphCache.java |  50 
 12 files changed, 89 insertions(+), 164 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index d148c24..b2e21cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -52,7 +53,7 @@ public interface RestEndpointFactory {
FatalErrorHandler fatalErrorHandler) throws Exception;
 
static ExecutionGraphCache 
createExecutionGraphCache(RestHandlerConfiguration restConfiguration) {
-   return new ExecutionGraphCache(
+   return new DefaultExecutionGraphCache(
restConfiguration.getTimeout(),

Time.milliseconds(restConfiguration.getRefreshInterval()));
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
similarity index 83%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
index f634a62..a01704e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
-import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
@@ -33,11 +32,9 @@ import java.util.function.Function;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
- * has an associated time to live after which a new request will trigger the 
reloading of the
- * {@link ArchivedExecutionGraph} from the cluster.
+ * Default implementation of {@link ExecutionGraphCache}.
  */
-public class ExecutionGraphCache implements Closeable {
+public class DefaultExecutionGraphCache implements ExecutionGraphCache {
 
private final Time timeout;
 
@@ -47,7 +44,7 @@ public class ExecutionGraphCache implements Closeable {
 
private volatile boolean running = true;
 
-   public ExecutionGraphCache(
+   public DefaultExecutionGraphCache(
Time timeout,
Time timeToLive) {
this.timeout = checkNotNull(timeout);
@@ 

[flink] branch master updated (948a42f -> 8514200)

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 948a42f  [FLINK-17344][test] Fix unstability on IdleTime metric test.
 new e48fac8  [hotfix] Remove unused docs.rest.NoOpTransientBlobService
 new 5cfa396  [hotfix] Make NoOpTransientBlobService a public testing class
 new bf0cfa6  [FLINK-17308] Pass ScheduledExecutorService to 
WebMonitorEndpoint
 new 3635b54  [FLINK-17308] Pass in ExecutionGraphCache to 
WebMonitorEndpoint
 new 08025ec  [FLINK-17308] Extract ExecutionGraphCache interface and 
rename impl into DefaultExecutionGraphCache
 new 8514200  [FLINK-17308] Add regular cleanup task for ExecutionGraphCache

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   7 +-
 ...tDispatcherResourceManagerComponentFactory.java |   4 +-
 .../jobmaster/MiniDispatcherRestEndpoint.java  |   7 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   5 +-
 .../flink/runtime/rest/RestEndpointFactory.java|  14 ++-
 .../runtime/rest/SessionRestEndpointFactory.java   |   5 +-
 ...hCache.java => DefaultExecutionGraphCache.java} |  27 +
 .../rest/handler/legacy/ExecutionGraphCache.java   | 128 ++---
 .../runtime/webmonitor/WebMonitorEndpoint.java |  36 --
 .../runtime/blob}/NoOpTransientBlobService.java|  10 +-
 .../rest/handler/job/JobConfigHandlerTest.java |   4 +-
 .../rest/handler/job/JobExceptionsHandlerTest.java |   4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   4 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   4 +-
 .../metrics/JobVertexWatermarksHandlerTest.java|   4 +-
 ...st.java => DefaultExecutionGraphCacheTest.java} |  18 +--
 .../util/DocumentingDispatcherRestEndpoint.java|  58 +-
 .../util/NoOpExecutionGraphCache.java} |  27 +++--
 .../webmonitor/TestingExecutionGraphCache.java | 113 ++
 .../runtime/webmonitor/WebMonitorEndpointTest.java |  79 +
 21 files changed, 309 insertions(+), 253 deletions(-)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCache.java
 => DefaultExecutionGraphCache.java} (83%)
 rename {flink-docs/src/main/java/org/apache/flink/docs/rest => 
flink-runtime/src/test/java/org/apache/flink/runtime/blob}/NoOpTransientBlobService.java
 (84%)
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/{ExecutionGraphCacheTest.java
 => DefaultExecutionGraphCacheTest.java} (93%)
 copy 
flink-runtime/src/test/java/org/apache/flink/runtime/{dispatcher/NoOpJobGraphWriter.java
 => rest/util/NoOpExecutionGraphCache.java} (52%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java



[flink] 02/06: [hotfix] Make NoOpTransientBlobService a public testing class

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5cfa396eede2f714a2ee78953d4ab1ed5a152481
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 09:46:56 2020 +0200

[hotfix] Make NoOpTransientBlobService a public testing class
---
 .../runtime/blob/NoOpTransientBlobService.java | 75 ++
 .../util/DocumentingDispatcherRestEndpoint.java| 55 +---
 2 files changed, 76 insertions(+), 54 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/NoOpTransientBlobService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/NoOpTransientBlobService.java
new file mode 100644
index 000..3f0bf2f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/NoOpTransientBlobService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * No-op implementation of {@link TransientBlobService}.
+ */
+public enum NoOpTransientBlobService implements TransientBlobService {
+   INSTANCE;
+
+   @Override
+   public File getFile(TransientBlobKey key) throws IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public File getFile(JobID jobId, TransientBlobKey key) throws 
IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public TransientBlobKey putTransient(byte[] value) throws IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public TransientBlobKey putTransient(JobID jobId, byte[] value) throws 
IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public TransientBlobKey putTransient(InputStream inputStream) throws 
IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public TransientBlobKey putTransient(JobID jobId, InputStream 
inputStream) throws IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean deleteFromCache(TransientBlobKey key) {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean deleteFromCache(JobID jobId, TransientBlobKey key) {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void close() throws IOException {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index db45ee0..e727ab2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest.util;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.blob.TransientBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.blob.NoOpTransientBlobService;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -43,9 +41,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import javax.annotation.Nonnull;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.UUID;
 import 

[flink] 03/06: [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf0cfa606487c742a38cfccfee780a613894a8d8
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 10:19:50 2020 +0200

[FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint
---
 .../flink/runtime/dispatcher/DispatcherRestEndpoint.java   |  4 ++--
 .../DefaultDispatcherResourceManagerComponentFactory.java  |  4 ++--
 .../flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java|  4 ++--
 .../org/apache/flink/runtime/rest/JobRestEndpointFactory.java  |  4 ++--
 .../org/apache/flink/runtime/rest/RestEndpointFactory.java |  4 ++--
 .../apache/flink/runtime/rest/SessionRestEndpointFactory.java  |  4 ++--
 .../apache/flink/runtime/webmonitor/WebMonitorEndpoint.java| 10 +-
 .../runtime/rest/util/DocumentingDispatcherRestEndpoint.java   |  2 +-
 8 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 0bbb10f..ba6f678 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -42,7 +42,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * REST endpoint for the {@link Dispatcher} component.
@@ -58,7 +58,7 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint 
resourceManagerRetriever,
TransientBlobService transientBlobService,
-   ExecutorService executor,
+   ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException 
{
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index e1d1296..84b94a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -71,7 +71,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Abstract class which implements the creation of the {@link 
DispatcherResourceManagerComponent} components.
@@ -137,7 +137,7 @@ public class 
DefaultDispatcherResourceManagerComponentFactory implements Dispatc
10,
Time.milliseconds(50L));
 
-   final ExecutorService executor = 
WebMonitorEndpoint.createExecutorService(
+   final ScheduledExecutorService executor = 
WebMonitorEndpoint.createExecutorService(

configuration.getInteger(RestOptions.SERVER_NUM_THREADS),

configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index f11b669..e9f7d61 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * REST endpoint for the {@link JobClusterEntrypoint}.
@@ -46,7 +46,7 @@ public class MiniDispatcherRestEndpoint extends 
WebMonitorEndpoint 
resourceManagerRetriever,
TransientBlobService transientBlobService,
-   ExecutorService executor,
+   ScheduledExecutorService executor,
   

[flink] 06/06: [FLINK-17308] Add regular cleanup task for ExecutionGraphCache

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 851420014584fdfd91308937cd913917c11cf4bd
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 10:53:27 2020 +0200

[FLINK-17308] Add regular cleanup task for ExecutionGraphCache

The WebMonitorEndpoint now schedules are regular cleanup task which
runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up
expired ExecutionGraphCache entries. This ensures that we will remove
unused entries.

This closes #11879.
---
 .../runtime/webmonitor/WebMonitorEndpoint.java |  21 
 .../webmonitor/TestingExecutionGraphCache.java | 113 +
 .../runtime/webmonitor/WebMonitorEndpointTest.java |  79 ++
 3 files changed, 213 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7ccf12e..a41926b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -136,6 +136,8 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -146,6 +148,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -175,6 +178,9 @@ public class WebMonitorEndpoint 
extends RestServerEndp
 
private final Collection archivingHandlers = new 
ArrayList<>(16);
 
+   @Nullable
+   private ScheduledFuture executionGraphCleanupTask;
+
public WebMonitorEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever leaderRetriever,
@@ -725,13 +731,28 @@ public class WebMonitorEndpoint 
extends RestServerEndp
@Override
public void startInternal() throws Exception {
leaderElectionService.start(this);
+   startExecutionGraphCacheCleanupTask();
+
if (hasWebUI) {
log.info("Web frontend listening at {}.", 
getRestBaseUrl());
}
}
 
+   private void startExecutionGraphCacheCleanupTask() {
+   final long cleanupInterval = 2 * 
restConfiguration.getRefreshInterval();
+   executionGraphCleanupTask = executor.scheduleWithFixedDelay(
+   executionGraphCache::cleanup,
+   cleanupInterval,
+   cleanupInterval,
+   TimeUnit.MILLISECONDS);
+   }
+
@Override
protected CompletableFuture shutDownInternal() {
+   if (executionGraphCleanupTask != null) {
+   executionGraphCleanupTask.cancel(false);
+   }
+
executionGraphCache.close();
 
final CompletableFuture shutdownFuture = 
FutureUtils.runAfterwards(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
new file mode 100644
index 000..a61bdc3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingExecutionGraphCache.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+
+import java.util.concurrent.CompletableFuture;
+import 

[flink] 01/06: [hotfix] Remove unused docs.rest.NoOpTransientBlobService

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e48fac8cc0f0255261e4dcb55f7b50eea1608f5c
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 09:44:51 2020 +0200

[hotfix] Remove unused docs.rest.NoOpTransientBlobService
---
 .../flink/docs/rest/NoOpTransientBlobService.java  | 77 --
 1 file changed, 77 deletions(-)

diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/NoOpTransientBlobService.java
 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/NoOpTransientBlobService.java
deleted file mode 100644
index 1d307c4..000
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/NoOpTransientBlobService.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.docs.rest;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.TransientBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobService;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * No-op implementation of {@link TransientBlobService} used by the {@link 
RestAPIDocGenerator}.
- */
-enum NoOpTransientBlobService implements TransientBlobService {
-   INSTANCE;
-
-   @Override
-   public File getFile(TransientBlobKey key) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public File getFile(JobID jobId, TransientBlobKey key) throws 
IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public TransientBlobKey putTransient(byte[] value) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public TransientBlobKey putTransient(JobID jobId, byte[] value) throws 
IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public TransientBlobKey putTransient(InputStream inputStream) throws 
IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public TransientBlobKey putTransient(JobID jobId, InputStream 
inputStream) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public boolean deleteFromCache(TransientBlobKey key) {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public boolean deleteFromCache(JobID jobId, TransientBlobKey key) {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void close() throws IOException {}
-}



[flink] 05/06: [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08025ec5d59659adea517d3f363abedd00c4fe7c
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 10:38:15 2020 +0200

[FLINK-17308] Extract ExecutionGraphCache interface and rename impl into 
DefaultExecutionGraphCache

This commit extracts the ExecutionGraphCache interface from the 
implementation and renames
the latter into DefaultExecutionGraphCache. Moreover, it introduces the 
NoOpExecutionGraphCache
implementation which is used for the DocumentingDispatcherRestEndpoint.
---
 .../flink/runtime/rest/RestEndpointFactory.java|   3 +-
 ...hCache.java => DefaultExecutionGraphCache.java} |  27 +
 .../rest/handler/legacy/ExecutionGraphCache.java   | 128 ++---
 .../rest/handler/job/JobConfigHandlerTest.java |   4 +-
 .../rest/handler/job/JobExceptionsHandlerTest.java |   4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   4 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   4 +-
 .../metrics/JobVertexWatermarksHandlerTest.java|   4 +-
 ...st.java => DefaultExecutionGraphCacheTest.java} |  18 +--
 .../util/DocumentingDispatcherRestEndpoint.java|   3 +-
 .../runtime/rest/util/NoOpExecutionGraphCache.java |  50 
 12 files changed, 89 insertions(+), 164 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index d148c24..b2e21cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -52,7 +53,7 @@ public interface RestEndpointFactory {
FatalErrorHandler fatalErrorHandler) throws Exception;
 
static ExecutionGraphCache 
createExecutionGraphCache(RestHandlerConfiguration restConfiguration) {
-   return new ExecutionGraphCache(
+   return new DefaultExecutionGraphCache(
restConfiguration.getTimeout(),

Time.milliseconds(restConfiguration.getRefreshInterval()));
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
similarity index 83%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
index f634a62..a01704e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
-import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
@@ -33,11 +32,9 @@ import java.util.function.Function;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
- * has an associated time to live after which a new request will trigger the 
reloading of the
- * {@link ArchivedExecutionGraph} from the cluster.
+ * Default implementation of {@link ExecutionGraphCache}.
  */
-public class ExecutionGraphCache implements Closeable {
+public class DefaultExecutionGraphCache implements ExecutionGraphCache {
 
private final Time timeout;
 
@@ -47,7 +44,7 @@ public class ExecutionGraphCache implements Closeable {
 
private volatile boolean running = true;
 
-   public ExecutionGraphCache(
+   public DefaultExecutionGraphCache(
Time timeout,
Time timeToLive) {
this.timeout = checkNotNull(timeout);
@@ -64,22 

[flink] 04/06: [FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint

2020-04-24 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3635b54c7b49ea48936ae4baff91a20a96e2f96c
Author: Till Rohrmann 
AuthorDate: Thu Apr 23 10:26:06 2020 +0200

[FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint
---
 .../apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java  | 3 +++
 .../flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java  | 3 +++
 .../org/apache/flink/runtime/rest/JobRestEndpointFactory.java| 1 +
 .../java/org/apache/flink/runtime/rest/RestEndpointFactory.java  | 9 +
 .../apache/flink/runtime/rest/SessionRestEndpointFactory.java| 1 +
 .../org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java  | 5 ++---
 .../runtime/rest/util/DocumentingDispatcherRestEndpoint.java | 2 ++
 7 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index ba6f678..870cb5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -61,6 +62,7 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint {
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception;
+
+   static ExecutionGraphCache 
createExecutionGraphCache(RestHandlerConfiguration restConfiguration) {
+   return new ExecutionGraphCache(
+   restConfiguration.getTimeout(),
+   
Time.milliseconds(restConfiguration.getRefreshInterval()));
+   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index b091197..c59d220 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -60,6 +60,7 @@ public enum SessionRestEndpointFactory implements 
RestEndpointFactory 
extends RestServerEndp
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
+   ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler) throws IOException 
{
super(endpointConfiguration);
this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
@@ -194,9 +195,7 @@ public class WebMonitorEndpoint 
extends RestServerEndp
this.transientBlobService = 
Preconditions.checkNotNull(transientBlobService);
this.executor = Preconditions.checkNotNull(executor);
 
-   this.executionGraphCache = new ExecutionGraphCache(
-   restConfiguration.getTimeout(),
-   
Time.milliseconds(restConfiguration.getRefreshInterval()));
+   this.executionGraphCache = executionGraphCache;
 
this.checkpointStatsCache = new CheckpointStatsCache(

restConfiguration.getMaxCheckpointStatisticCacheEntries());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index c3db3c3..9f84bce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestEndpointFactory;

[flink] branch master updated (eeaf08f -> 948a42f)

2020-04-24 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from eeaf08f  [FLINK-16667][python][client] Support the Python dependency 
configuration options in CliFrontend (#11702)
 add 948a42f  [FLINK-17344][test] Fix unstability on IdleTime metric test.

No new revisions were added by this update.

Summary of changes:
 .../flink/runtime/io/network/api/writer/RecordWriterTest.java | 11 ---
 .../runtime/tasks/mailbox/TaskMailboxProcessorTest.java   |  4 
 2 files changed, 8 insertions(+), 7 deletions(-)



[flink] branch master updated (bf39d75 -> eeaf08f)

2020-04-24 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from bf39d75  [FLINK-17169][table-blink] Refactor BaseRow to use RowKind 
instead of byte header
 add eeaf08f  [FLINK-16667][python][client] Support the Python dependency 
configuration options in CliFrontend (#11702)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/client/cli/CliFrontend.java   |  39 ++--
 .../apache/flink/client/cli/CliFrontendParser.java |   8 +-
 .../flink/client/cli/ExecutionConfigAccessor.java  |  12 +--
 .../apache/flink/client/cli/ProgramOptions.java| 110 ++---
 .../flink/client/cli/ProgramOptionsUtils.java  |  96 ++
 .../flink/client/program/PackagedProgram.java  |  35 +--
 .../flink/client/program/PackagedProgramUtils.java |  42 
 .../client/cli/CliFrontendPackageProgramTest.java  |  12 +--
 .../flink/client/cli/CliFrontendRunTest.java   |  27 +
 .../flink/client/cli/PythonProgramOptions.java |  97 ++
 .../flink/python/util/PythonDependencyUtils.java   |  23 +
 .../flink/client/cli/PythonProgramOptionsTest.java | 107 
 .../python/util/PythonDependencyUtilsTest.java |   6 +-
 .../client/gateway/local/ExecutionContext.java |   2 +-
 14 files changed, 442 insertions(+), 174 deletions(-)
 create mode 100644 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
 create mode 100644 
flink-python/src/main/java/org/apache/flink/client/cli/PythonProgramOptions.java
 create mode 100644 
flink-python/src/test/java/org/apache/flink/client/cli/PythonProgramOptionsTest.java



[flink] 03/04: [FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing updates of emit strategy

2020-04-24 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 670ef41ed59dbf7069e313e5b0f8b373bd4245b4
Author: Jark Wu 
AuthorDate: Tue Apr 21 00:04:12 2020 +0800

[FLINK-17169][table-blink] Fix allowLateness shouldn't affect producing 
updates of emit strategy

This closes #11797
---
 .../planner/plan/utils/WindowEmitStrategy.scala|   2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml| 272 -
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  28 +++
 .../operators/window/WindowOperatorBuilder.java|   2 -
 4 files changed, 191 insertions(+), 113 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
index eff8a48..07b30f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
@@ -63,7 +63,7 @@ class WindowEmitStrategy(
 
   def produceUpdates: Boolean = {
 if (isEventTime) {
-  allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled
+  earlyFireDelayEnabled || lateFireDelayEnabled
 } else {
   earlyFireDelayEnabled
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 13553e4..2459697 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -134,6 +134,32 @@ Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS 
EXPR$3])
 ]]>
 
   
+  
+
+  
+
+
+  
+
+
+  
+
+  
   
 
   
 
   
-  
-
-  
-
-
-  
-
-
-  
-
-  
-  
-
-  
-
-
-  
-
-
-  
-
-  
   
 
   
 
   
+  
+
+  
+
+
+  
+
+
+  
+
+  
   
 
   
 
   
-  
+  
 
-  
+  
 
 
   
 
 
   
 
   
-  
+  
 
   
 
 
   
 
 
   
+
+  
+  
+
+  
+
+
+  
+
+
+  
 
   
@@ -638,4 +664,30 @@ Union(all=[true], union=[EXPR$0])
 ]]>
 
   
+  
+
+  
+
+
+  
+
+
+  
+
+  
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 83d5544..fe66dd4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.planner.plan.stream.sql.agg
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
+import 
org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_LATE_FIRE_DELAY,
 TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.junit.Test
@@ -414,4 +416,30 @@ class WindowAggregateTest extends TableTestBase {
 
 util.verifyPlan(sql)
   }
+
+  @Test
+  def testWindowAggregateWithLateFire(): Unit = {
+util.conf.getConfiguration.setBoolean(TABLE_EXEC_EMIT_LATE_FIRE_ENABLED, 
true)
+util.conf.getConfiguration.setString(TABLE_EXEC_EMIT_LATE_FIRE_DELAY, "5s")
+util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+val sql =
+  """
+|SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
+|FROM MyTable
+|GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
+|""".stripMargin
+util.verifyPlanWithTrait(sql)
+  }
+
+  @Test
+  def testWindowAggregateWithAllowLatenessOnly(): Unit = {
+util.conf.setIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+val sql =
+  """
+|SELECT 

[flink] branch master updated (26af121 -> bf39d75)

2020-04-24 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 26af121  [hotfix][docs] link tags in zh docs require .zh.md links
 new 91f4a3c  [FLINK-17169][core] Add shortString() method to RowKind to 
get short string representation
 new 7662e0b  [FLINK-17169][table-planner-blink] Rename 
StreamRecordUtils#record() to insertRecord()
 new 670ef41  [FLINK-17169][table-blink] Fix allowLateness shouldn't affect 
producing updates of emit strategy
 new bf39d75  [FLINK-17169][table-blink] Refactor BaseRow to use RowKind 
instead of byte header

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/flink/types/RowKind.java  |  67 +++-
 ...bstractBaseRowPythonScalarFunctionOperator.java |   2 +-
 .../BaseRowPythonScalarFunctionOperator.java   |   2 +-
 .../BaseRowArrowPythonScalarFunctionOperator.java  |   2 +-
 .../table/BaseRowPythonTableFunctionOperator.java  |   4 +-
 .../BaseRowPythonScalarFunctionOperatorTest.java   |   6 +-
 ...seRowArrowPythonScalarFunctionOperatorTest.java |   6 +-
 .../BaseRowPythonTableFunctionOperatorTest.java|   8 +-
 .../table/planner/codegen/CalcCodeGenerator.scala  |   2 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |   6 +-
 .../planner/codegen/CorrelateCodeGenerator.scala   |  12 +-
 .../planner/codegen/EqualiserCodeGenerator.scala   |   2 +-
 .../planner/codegen/ExpandCodeGenerator.scala  |   2 +-
 .../planner/codegen/LookupJoinCodeGenerator.scala  |   2 +-
 .../codegen/agg/AggsHandlerCodeGenerator.scala |   4 +-
 .../physical/stream/StreamExecDeduplicate.scala|  25 +-
 .../StreamExecGroupWindowAggregateBase.scala   |   2 +-
 .../planner/plan/utils/WindowEmitStrategy.scala|   2 +-
 .../flink/table/planner/utils/BaseRowTestUtil.java |   6 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml| 272 --
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  28 ++
 ...AbstractTwoInputStreamOperatorWithTTLTest.scala |   6 +-
 .../harness/GroupAggregateHarnessTest.scala|  68 ++--
 .../harness/TableAggregateHarnessTest.scala|  90 ++---
 .../planner/runtime/stream/sql/CalcITCase.scala|   4 +-
 .../planner/runtime/stream/sql/RankITCase.scala|   8 +-
 .../planner/runtime/stream/sql/ValuesITCase.scala  |   2 +-
 .../table/planner/runtime/utils/TestSinkUtil.scala |   3 +-
 .../org/apache/flink/table/dataformat/BaseRow.java |  15 +-
 .../apache/flink/table/dataformat/BinaryRow.java   |  19 +-
 .../flink/table/dataformat/BinaryRowWriter.java|   6 +-
 .../apache/flink/table/dataformat/ColumnarRow.java |  11 +-
 .../apache/flink/table/dataformat/GenericRow.java  |   2 +-
 .../apache/flink/table/dataformat/JoinedRow.java   |  12 +-
 .../apache/flink/table/dataformat/NestedRow.java   |  11 +-
 .../flink/table/dataformat/ObjectArrayRow.java |  24 +-
 .../flink/table/dataformat/UpdatableRow.java   |  10 +-
 .../flink/table/dataformat/util/BaseRowUtil.java   |  33 +-
 .../operators/aggregate/GroupAggFunction.java  |  18 +-
 .../aggregate/MiniBatchGlobalGroupAggFunction.java |  27 +-
 .../aggregate/MiniBatchGroupAggFunction.java   |  22 +-
 .../deduplicate/DeduplicateFunctionHelper.java |  48 ++-
 .../DeduplicateKeepLastRowFunction.java|  20 +-
 .../MiniBatchDeduplicateKeepLastRowFunction.java   |   5 +-
 .../join/lookup/AsyncLookupJoinRunner.java |   4 +-
 .../operators/join/lookup/LookupJoinRunner.java|   2 +-
 .../join/stream/StreamingJoinOperator.java |  72 ++--
 .../join/stream/StreamingSemiAntiJoinOperator.java |  53 +--
 .../temporal/TemporalProcessTimeJoinOperator.java  |   2 +-
 .../join/temporal/TemporalRowTimeJoinOperator.java |   5 +-
 .../operators/rank/AbstractTopNFunction.java   |  54 +--
 .../operators/rank/AppendOnlyTopNFunction.java |  33 +-
 .../operators/rank/RetractableTopNFunction.java|  77 ++--
 .../operators/rank/UpdatableTopNFunction.java  |  95 ++---
 .../runtime/operators/sort/StreamSortOperator.java |   3 +-
 .../operators/window/AggregateWindowOperator.java  |  39 +-
 .../window/TableAggregateWindowOperator.java   |   4 +-
 .../runtime/operators/window/WindowOperator.java   |  12 +-
 .../operators/window/WindowOperatorBuilder.java|  18 +-
 .../table/runtime/typeutils/BaseRowSerializer.java |   4 +-
 .../apache/flink/table/dataformat/BaseRowTest.java |   7 +-
 .../flink/table/dataformat/BinaryRowTest.java  |  13 +-
 .../DeduplicateKeepFirstRowFunctionTest.java   |  32 +-
 .../DeduplicateKeepLastRowFunctionTest.java|  95 +++--
 ...niBatchDeduplicateKeepFirstRowFunctionTest.java |  32 +-
 ...iniBatchDeduplicateKeepLastRowFunctionTest.java | 123 ---
 

[flink] 01/04: [FLINK-17169][core] Add shortString() method to RowKind to get short string representation

2020-04-24 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91f4a3c5081a99822430ba0683bce381aec188bd
Author: Jark Wu 
AuthorDate: Fri Apr 17 22:11:44 2020 +0800

[FLINK-17169][core] Add shortString() method to RowKind to get short string 
representation

This closes #11797
---
 .../main/java/org/apache/flink/types/RowKind.java  | 67 --
 1 file changed, 63 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/types/RowKind.java 
b/flink-core/src/main/java/org/apache/flink/types/RowKind.java
index 3b45001..a1acf7d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/RowKind.java
+++ b/flink-core/src/main/java/org/apache/flink/types/RowKind.java
@@ -29,7 +29,7 @@ public enum RowKind {
/**
 * Insertion operation.
 */
-   INSERT,
+   INSERT("+I", (byte) 0),
 
/**
 * Update operation with the previous content of the updated row.
@@ -38,7 +38,7 @@ public enum RowKind {
 * to retract the previous row first. It is useful in cases of a 
non-idempotent update, i.e., an
 * update of a row that is not uniquely identifiable by a key.
 */
-   UPDATE_BEFORE,
+   UPDATE_BEFORE("-U", (byte) 1),
 
/**
 * Update operation with new content of the updated row.
@@ -47,10 +47,69 @@ public enum RowKind {
 * needs to retract the previous row first. OR it describes an 
idempotent update, i.e., an update
 * of a row that is uniquely identifiable by a key.
 */
-   UPDATE_AFTER,
+   UPDATE_AFTER("+U", (byte) 2),
 
/**
 * Deletion operation.
 */
-   DELETE
+   DELETE("-D", (byte) 3);
+
+   private final String shortString;
+
+   private final byte value;
+
+   /**
+* Creates a {@link RowKind} enum with the given short string and byte 
value representation of
+* the {@link RowKind}.
+*/
+   RowKind(String shortString, byte value) {
+   this.shortString = shortString;
+   this.value = value;
+   }
+
+   /**
+* Returns a short string representation of this {@link RowKind}.
+*
+* 
+* "+I" represents {@link #INSERT}.
+* "-U" represents {@link #UPDATE_BEFORE}.
+* "+U" represents {@link #UPDATE_AFTER}.
+* "-D" represents {@link #DELETE}.
+* 
+*/
+   public String shortString() {
+   return shortString;
+   }
+
+   /**
+* Returns the byte value representation of this {@link RowKind}. The 
byte value is used
+* for serialization and deserialization.
+*
+* 
+*   "0" represents {@link #INSERT}.
+*   "1" represents {@link #UPDATE_BEFORE}.
+*   "2" represents {@link #UPDATE_AFTER}.
+*   "3" represents {@link #DELETE}.
+* 
+*/
+   public byte toByteValue() {
+   return value;
+   }
+
+   /**
+* Creates a {@link RowKind} from the given byte value. Each {@link 
RowKind} has a byte
+* value representation.
+*
+* @see #toByteValue() for mapping of byte value and {@link RowKind}.
+*/
+   public static RowKind fromByteValue(byte value) {
+   switch (value) {
+   case 0: return INSERT;
+   case 1: return UPDATE_BEFORE;
+   case 2: return UPDATE_AFTER;
+   case 3: return DELETE;
+   default: throw new UnsupportedOperationException(
+   "Unsupported byte value '" + value + "' for row 
kind.");
+   }
+   }
 }



[flink-docker] branch master updated: [FLINK-17346] Deduplicate process setup

2020-04-24 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 887ac36  [FLINK-17346] Deduplicate process setup
887ac36 is described below

commit 887ac36c2b9611095f989eb0cff527fa196f0456
Author: Chesnay Schepler 
AuthorDate: Thu Apr 23 10:54:17 2020 +0200

[FLINK-17346] Deduplicate process setup
---
 testing/testing_lib.sh | 104 +
 1 file changed, 26 insertions(+), 78 deletions(-)

diff --git a/testing/testing_lib.sh b/testing/testing_lib.sh
index 504029d..36dfe32 100644
--- a/testing/testing_lib.sh
+++ b/testing/testing_lib.sh
@@ -45,51 +45,42 @@ function build_image() {
 docker build -t "$image_name" "$dockerfile_dir"
 }
 
-function run_jobmanager() {
-local dockerfile
-dockerfile="$1"
+function internal_run() {
+local dockerfile="$1"
+local docker_run_command="$2"
+local args="$3"
 
 local image_tag image_name
 image_tag="$(image_tag "$dockerfile")"
 image_name="$(image_name "$image_tag")"
 
-echo >&2 "===> Starting ${image_tag} jobmanager..."
-
-# Prints container ID
-docker run \
---rm \
---detach \
---name "jobmanager" \
---network "$NETWORK_NAME" \
---publish 6123:6123 \
---publish 8081:8081 \
--e JOB_MANAGER_RPC_ADDRESS="jobmanager" \
-"$image_name" \
-jobmanager
+echo >&2 "===> Starting ${image_tag} ${args}..."
+
+eval "docker run --rm --detach --network $NETWORK_NAME -e 
JOB_MANAGER_RPC_ADDRESS=jobmanager ${docker_run_command} $image_name ${args}"
+}
+
+function internal_run_jobmanager() {
+internal_run "$1" "--name jobmanager --publish 6123:6123 --publish 
8081:8081 $2" jobmanager
+}
+
+function run_jobmanager() {
+internal_run_jobmanager "$1" ""
 }
 
 function run_jobmanager_non_root() {
-local dockerfile
-dockerfile="$1"
+internal_run_jobmanager "$1" "--user flink"
+}
 
-local image_tag image_name
-image_tag="$(image_tag "$dockerfile")"
-image_name="$(image_name "$image_tag")"
+function internal_run_taskmanager() {
+internal_run "$1" "--name taskmanager $2" "taskmanager"
+}
 
-echo >&2 "===> Starting ${image_tag} jobmanager as non-root..."
-
-# Prints container ID
-docker run \
---rm \
---detach \
---name "jobmanager" \
---network "$NETWORK_NAME" \
---user flink \
---publish 6123:6123 \
---publish 8081:8081 \
--e JOB_MANAGER_RPC_ADDRESS="jobmanager" \
-"$image_name" \
-jobmanager
+function run_taskmanager() {
+  internal_run_taskmanager "$1" ""
+}
+
+function run_taskmanager_non_root() {
+  internal_run_taskmanager "$1" "--user flink"
 }
 
 function wait_for_jobmanager() {
@@ -131,49 +122,6 @@ function wait_for_jobmanager() {
 echo >&2 "===> ${image_tag} jobmanager is ready."
 }
 
-function run_taskmanager() {
-local dockerfile
-dockerfile="$1"
-
-local image_tag image_name
-image_tag="$(image_tag "$dockerfile")"
-image_name="$(image_name "$image_tag")"
-
-echo >&2 "===> Starting ${image_tag} taskmanager..."
-
-# Prints container ID
-docker run \
---rm \
---detach \
---name "taskmanager" \
---network "$NETWORK_NAME" \
--e JOB_MANAGER_RPC_ADDRESS="jobmanager" \
-"$image_name" \
-taskmanager
-}
-
-function run_taskmanager_non_root() {
-local dockerfile
-dockerfile="$1"
-
-local image_tag image_name
-image_tag="$(image_tag "$dockerfile")"
-image_name="$(image_name "$image_tag")"
-
-echo >&2 "===> Starting ${image_tag} taskmanager as non-root..."
-
-# Prints container ID
-docker run \
---rm \
---detach \
---name "taskmanager" \
---network "$NETWORK_NAME" \
---user flink \
--e JOB_MANAGER_RPC_ADDRESS="jobmanager" \
-"$image_name" \
-taskmanager
-}
-
 function test_image() {
 local dockerfile
 dockerfile="$1"



[flink] branch master updated: [hotfix][docs] link tags in zh docs require .zh.md links

2020-04-24 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 26af121  [hotfix][docs] link tags in zh docs require .zh.md links
26af121 is described below

commit 26af1215d46089f3c66407e108ede9968cb9b8f7
Author: David Anderson 
AuthorDate: Fri Apr 24 09:23:06 2020 +0200

[hotfix][docs] link tags in zh docs require .zh.md links
---
 docs/dev/event_time.zh.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/dev/event_time.zh.md b/docs/dev/event_time.zh.md
index a25122f..dbe964f 100644
--- a/docs/dev/event_time.zh.md
+++ b/docs/dev/event_time.zh.md
@@ -26,13 +26,13 @@ under the License.
 
 In this section you will learn about writing time-aware Flink programs. Please
 take a look at [Timely Stream Processing]({% link
-concepts/timely-stream-processing.md %}) to learn about the concepts behind
+concepts/timely-stream-processing.zh.md %}) to learn about the concepts behind
 timely stream processing.
 
 For information about how to use time in Flink programs refer to
-[windowing]({% link dev/stream/operators/windows.md %}) and
+[windowing]({% link dev/stream/operators/windows.zh.md %}) and
 [ProcessFunction]({% link
-dev/stream/operators/process_function.md %}).
+dev/stream/operators/process_function.zh.md %}).
 
 * toc
 {:toc}



[flink-docker] branch master updated: [FLINK-17187] Add utility method for setting config options

2020-04-24 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 379970e  [FLINK-17187] Add utility method for setting config options
379970e is described below

commit 379970e8d2a9e138d1291d83b47e7ea643421b3a
Author: Chesnay Schepler 
AuthorDate: Thu Apr 16 13:34:29 2020 +0200

[FLINK-17187] Add utility method for setting config options
---
 1.10/scala_2.11-debian/docker-entrypoint.sh | 64 +++--
 1.10/scala_2.12-debian/docker-entrypoint.sh | 64 +++--
 1.9/scala_2.11-debian/docker-entrypoint.sh  | 64 +++--
 1.9/scala_2.12-debian/docker-entrypoint.sh  | 64 +++--
 docker-entrypoint.sh| 64 +++--
 5 files changed, 120 insertions(+), 200 deletions(-)

diff --git a/1.10/scala_2.11-debian/docker-entrypoint.sh 
b/1.10/scala_2.11-debian/docker-entrypoint.sh
index 12e5d5a..011ccea 100755
--- a/1.10/scala_2.11-debian/docker-entrypoint.sh
+++ b/1.10/scala_2.11-debian/docker-entrypoint.sh
@@ -56,6 +56,27 @@ copy_plugins_if_required() {
   done
 }
 
+set_config_option() {
+  local option=$1
+  local value=$2
+
+  # escape periods for usage in regular expressions
+  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
+
+  # either override an existing entry, or append a new one
+  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
+sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
+  else
+echo "${option}: ${value}" >> "${CONF_FILE}"
+  fi
+}
+
+set_common_options() {
+set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
+set_config_option blob.server.port 6124
+set_config_option query.server.port 6125
+}
+
 if [ "$1" = "help" ]; then
 echo "Usage: $(basename "$0") (jobmanager|taskmanager|help)"
 exit 0
@@ -64,23 +85,7 @@ elif [ "$1" = "jobmanager" ]; then
 echo "Starting Job Manager"
 copy_plugins_if_required
 
-if grep -E "^jobmanager\.rpc\.address:.*" "${CONF_FILE}" > /dev/null; then
-sed -i -e "s/jobmanager\.rpc\.address:.*/jobmanager.rpc.address: 
${JOB_MANAGER_RPC_ADDRESS}/g" "${CONF_FILE}"
-else
-echo "jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}" >> 
"${CONF_FILE}"
-fi
-
-if grep -E "^blob\.server\.port:.*" "${CONF_FILE}" > /dev/null; then
-sed -i -e "s/blob\.server\.port:.*/blob.server.port: 6124/g" 
"${CONF_FILE}"
-else
-echo "blob.server.port: 6124" >> "${CONF_FILE}"
-fi
-
-if grep -E "^query\.server\.port:.*" "${CONF_FILE}" > /dev/null; then
-sed -i -e "s/query\.server\.port:.*/query.server.port: 6125/g" 
"${CONF_FILE}"
-else
-echo "query.server.port: 6125" >> "${CONF_FILE}"
-fi
+set_common_options
 
 if [ -n "${FLINK_PROPERTIES}" ]; then
 echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
@@ -95,29 +100,8 @@ elif [ "$1" = "taskmanager" ]; then
 
 
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep 
-c ^processor /proc/cpuinfo)}
 
-if grep -E "^jobmanager\.rpc\.address:.*" "${CONF_FILE}" > /dev/null; then
-sed -i -e "s/jobmanager\.rpc\.address:.*/jobmanager.rpc.address: 
${JOB_MANAGER_RPC_ADDRESS}/g" "${CONF_FILE}"
-else
-echo "jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}" >> 
"${CONF_FILE}"
-fi
-
-if grep -E "^taskmanager\.numberOfTaskSlots:.*" "${CONF_FILE}" > 
/dev/null; then
-sed -i -e 
"s/taskmanager\.numberOfTaskSlots:.*/taskmanager.numberOfTaskSlots: 
${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}/g" "${CONF_FILE}"
-else
-echo "taskmanager.numberOfTaskSlots: 
${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" >> "${CONF_FILE}"
-fi
-
-if grep -E "^blob\.server\.port:.*" "${CONF_FILE}" > /dev/null; then
-sed -i -e "s/blob\.server\.port:.*/blob.server.port: 6124/g" 
"${CONF_FILE}"
-else
-echo "blob.server.port: 6124" >> "${CONF_FILE}"
-fi
-
-if grep -E "^query\.server\.port:.*" "${CONF_FILE}" > /dev/null; then
-sed -i -e "s/query\.server\.port:.*/query.server.port: 6125/g" 
"${CONF_FILE}"
-else
-echo "query.server.port: 6125" >> "${CONF_FILE}"
-fi
+set_common_options
+set_config_option taskmanager.numberOfTaskSlots 
${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
 
 if [ -n "${FLINK_PROPERTIES}" ]; then
 echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
diff --git a/1.10/scala_2.12-debian/docker-entrypoint.sh 
b/1.10/scala_2.12-debian/docker-entrypoint.sh
index 12e5d5a..011ccea 100755
--- a/1.10/scala_2.12-debian/docker-entrypoint.sh
+++ b/1.10/scala_2.12-debian/docker-entrypoint.sh
@@ -56,6 +56,27 @@ copy_plugins_if_required() {
   done
 }
 
+set_config_option() {
+  local option=$1
+  local value=$2
+
+  # escape periods for usage in 

[flink-docker] branch master updated (9aa8079 -> e7e1903)

2020-04-24 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git.


from 9aa8079  [FLINK-17287][github] Disable merge commit button
 new dbbbfba  [FLINK-17365] Add GPG key for 1.9.3 release
 new e7e1903  [FLINK-17365] Update Dockerfiles for 1.9.3 release

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 1.9/scala_2.11-debian/Dockerfile | 4 ++--
 1.9/scala_2.12-debian/Dockerfile | 4 ++--
 add-version.sh   | 2 ++
 3 files changed, 6 insertions(+), 4 deletions(-)



[flink-docker] 02/02: [FLINK-17365] Update Dockerfiles for 1.9.3 release

2020-04-24 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit e7e19032d43fc8d93cb3763faf32f1bff9df891d
Author: Chesnay Schepler 
AuthorDate: Fri Apr 24 09:32:43 2020 +0200

[FLINK-17365] Update Dockerfiles for 1.9.3 release
---
 1.9/scala_2.11-debian/Dockerfile | 4 ++--
 1.9/scala_2.12-debian/Dockerfile | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/1.9/scala_2.11-debian/Dockerfile b/1.9/scala_2.11-debian/Dockerfile
index d077523..49046f8 100644
--- a/1.9/scala_2.11-debian/Dockerfile
+++ b/1.9/scala_2.11-debian/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_VERSION=1.9.2 \
+ENV FLINK_VERSION=1.9.3 \
 SCALA_VERSION=2.11 \
-GPG_KEY=EF88474C564C7A608A822EEC3FF96A2057B6476C
+GPG_KEY=6B6291A8502BA8F0913AE04DDEB95B05BF075300
 
 # Prepare environment
 ENV FLINK_HOME=/opt/flink
diff --git a/1.9/scala_2.12-debian/Dockerfile b/1.9/scala_2.12-debian/Dockerfile
index ed8ec7c..aad8384 100644
--- a/1.9/scala_2.12-debian/Dockerfile
+++ b/1.9/scala_2.12-debian/Dockerfile
@@ -44,9 +44,9 @@ RUN set -ex; \
   gosu nobody true
 
 # Configure Flink version
-ENV FLINK_VERSION=1.9.2 \
+ENV FLINK_VERSION=1.9.3 \
 SCALA_VERSION=2.12 \
-GPG_KEY=EF88474C564C7A608A822EEC3FF96A2057B6476C
+GPG_KEY=6B6291A8502BA8F0913AE04DDEB95B05BF075300
 
 # Prepare environment
 ENV FLINK_HOME=/opt/flink



[flink-docker] 01/02: [FLINK-17365] Add GPG key for 1.9.3 release

2020-04-24 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit dbbbfbaa129037ae37a102da85554436b083691b
Author: Chesnay Schepler 
AuthorDate: Fri Apr 24 09:32:19 2020 +0200

[FLINK-17365] Add GPG key for 1.9.3 release
---
 add-version.sh | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/add-version.sh b/add-version.sh
index d2f5f66..d6db723 100755
--- a/add-version.sh
+++ b/add-version.sh
@@ -81,6 +81,8 @@ elif [ "$flink_version" = "1.9.1" ]; then
 gpg_key="E2C45417BED5C104154F341085BACB5AEFAE3202"
 elif [ "$flink_version" = "1.9.2" ]; then
 gpg_key="EF88474C564C7A608A822EEC3FF96A2057B6476C"
+elif [ "$flink_version" = "1.9.3" ]; then
+gpg_key="6B6291A8502BA8F0913AE04DDEB95B05BF075300"
 elif [ "$flink_version" = "1.10.0" ]; then
 gpg_key="BB137807CEFBE7DD2616556710B12A1F89C115E8"
 else



[flink-docker] branch master updated: [FLINK-17287][github] Disable merge commit button

2020-04-24 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 9aa8079  [FLINK-17287][github] Disable merge commit button
9aa8079 is described below

commit 9aa8079fc95bb8d7fc704a880471035fd5b1af61
Author: Chesnay Schepler 
AuthorDate: Tue Apr 21 10:07:26 2020 +0200

[FLINK-17287][github] Disable merge commit button
---
 .asf.yaml | 4 
 1 file changed, 4 insertions(+)

diff --git a/.asf.yaml b/.asf.yaml
index ae3274d..867d33f 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -4,3 +4,7 @@ github:
   labels:
 - flink
 - docker
+  enabled_merge_buttons:
+squash: true
+merge: false
+rebase: true



[flink] branch master updated: [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph generation

2020-04-24 Thread zhuzh
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ef4daeb  [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for 
JobGraph generation
ef4daeb is described below

commit ef4daeba7881cecc1548e387ab68d829f998dc67
Author: Zhu Zhu 
AuthorDate: Fri Apr 24 11:39:36 2020 +0800

[FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph 
generation
---
 .../api/graph/GlobalDataExchangeMode.java  |  48 +++
 .../flink/streaming/api/graph/StreamGraph.java |  22 +--
 .../streaming/api/graph/StreamGraphGenerator.java  |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java  |  32 -
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  73 --
 ...aphGeneratorWithGlobalDataExchangeModeTest.java | 153 +
 .../flink/table/planner/utils/ExecutorUtils.java   |   3 +-
 7 files changed, 241 insertions(+), 102 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
new file mode 100644
index 000..a74d9f3
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+
+/**
+ * This mode decides the default {@link ResultPartitionType} of job edges.
+ * Note that this only affects job edges which are {@link 
ShuffleMode#UNDEFINED}.
+ */
+public enum GlobalDataExchangeMode {
+   /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
+   ALL_EDGES_BLOCKING,
+
+   /**
+* Set job edges with {@link ForwardPartitioner} to be {@link 
ResultPartitionType#PIPELINED_BOUNDED}
+* and other edges to be {@link ResultPartitionType#BLOCKING}.
+**/
+   FORWARD_EDGES_PIPELINED,
+
+   /**
+* Set job edges with {@link ForwardPartitioner} or {@link 
RescalePartitioner} to be
+* {@link ResultPartitionType#PIPELINED_BOUNDED} and other edges to be 
{@link ResultPartitionType#BLOCKING}.
+**/
+   POINTWISE_EDGES_PIPELINED,
+
+   /** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
+   ALL_EDGES_PIPELINED
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 39393fc..83d9aca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -96,11 +96,7 @@ public class StreamGraph implements Pipeline {
 
private TimeCharacteristic timeCharacteristic;
 
-   /**
-* If there are some stream edges that can not be chained and the 
shuffle mode of edge is not
-* specified, translate these edges into {@code BLOCKING} result 
partition type.
-*/
-   private boolean blockingConnectionsBetweenChains;
+   private GlobalDataExchangeMode globalDataExchangeMode;
 
/** Flag to indicate whether to put all vertices into the same slot 
sharing group by default. */
private boolean allVerticesInSameSlotSharingGroupByDefault = true;
@@ -201,20 +197,12 @@ public class StreamGraph implements Pipeline {
this.timeCharacteristic = timeCharacteristic;
}
 
-   /**
-* If there are some stream edges that can not be chained and the 
shuffle mode of edge is not
-* specified, translate these edges into {@code BLOCKING} result 
partition