[flink] branch release-1.10 updated: [FLINK-15073][sql client] Sql client falis to run same query multiple times

2019-12-11 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 4a5a720  [FLINK-15073][sql client] Sql client falis to run same query 
multiple times
4a5a720 is described below

commit 4a5a720024992c12bbfd4fb316d04f24d23a109e
Author: yuzhao.cyz 
AuthorDate: Mon Dec 9 14:46:44 2019 +0800

[FLINK-15073][sql client] Sql client falis to run same query multiple times

After we change the SQL-CLI to stateful in FLINK-14672, each query's
temporal table was left out so we can not re-registered the same
object(from the same query).

This closes #10523

(cherry picked from commit 7bf96cf5fbd76377f5054c3b3f6552615a94c11d)
---
 .../table/client/gateway/local/LocalExecutor.java  |  12 +-
 .../client/gateway/local/LocalExecutorITCase.java  | 121 +
 2 files changed, 130 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index c9f50d0..1174b09 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -619,16 +619,16 @@ public class LocalExecutor implements Executor {
removeTimeAttributes(table.getSchema()),
context.getExecutionConfig(),
context.getClassLoader());
-
final String jobName = sessionId + ": " + query;
+   final String tableName = String.format("_tmp_table_%s", 
Math.abs(query.hashCode()));
final Pipeline pipeline;
try {
// writing to a sink requires an optimization step that 
might reference UDFs during code compilation
context.wrapClassLoader(() -> {
-   
context.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
+   
context.getTableEnvironment().registerTableSink(tableName, 
result.getTableSink());
table.insertInto(
context.getQueryConfig(),
-   jobName);
+   tableName);
return null;
});
pipeline = context.createPipeline(jobName, 
context.getFlinkConfig());
@@ -638,6 +638,12 @@ public class LocalExecutor implements Executor {
result.close();
// catch everything such that the query does not crash 
the executor
throw new SqlExecutionException("Invalid SQL query.", 
t);
+   } finally {
+   // Remove the temporal table object.
+   context.wrapClassLoader(() -> {
+   
context.getTableEnvironment().dropTemporaryTable(tableName);
+   return null;
+   });
}
 
// store the result with a unique id
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index c7a19bf..10c797f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -450,6 +450,50 @@ public class LocalExecutorITCase extends TestLogger {
}
}
 
+   @Test(timeout = 90_000L)
+   public void testStreamQueryExecutionChangelogMultipleTimes() throws 
Exception {
+   final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+   Objects.requireNonNull(url);
+   final Map replaceVars = new HashMap<>();
+   replaceVars.put("$VAR_PLANNER", planner);
+   replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+   replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+   replaceVars.put("$VAR_RESULT_MODE", "changelog");
+   replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+   replaceVars.put("$VAR_MAX_ROWS", "100");
+
+   final Executor executor = createModifiedExecutor(clusterClient, 
replaceVars);
+   final SessionContext session = new 

[flink] branch master updated (98dd5dc -> 7bf96cf)

2019-12-11 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 98dd5dc  [FLINK-14007][python][docs] Add documentation for how to use 
Java user-defined source/sink in Python API
 add 7bf96cf  [FLINK-15073][sql client] Sql client falis to run same query 
multiple times

No new revisions were added by this update.

Summary of changes:
 .../table/client/gateway/local/LocalExecutor.java  |  12 +-
 .../client/gateway/local/LocalExecutorITCase.java  | 121 +
 2 files changed, 130 insertions(+), 3 deletions(-)



[flink-web] branch asf-site updated (b69c6fe -> f78b71e)

2019-12-11 Thread hequn
This is an automated email from the ASF dual-hosted git repository.

hequn pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from b69c6fe  Rebuild website
 new 493689f  Add Apache Flink release 1.8.3
 new f78b71e  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _config.yml|  56 
 _posts/2019-12-11-release-1.8.3.md | 144 +
 content/blog/feed.xml  | 140 
 content/blog/index.html|  38 --
 content/blog/page10/index.html |  25 
 content/blog/page2/index.html  |  38 +++---
 content/blog/page3/index.html  |  40 +++---
 content/blog/page4/index.html  |  38 --
 content/blog/page5/index.html  |  37 --
 content/blog/page6/index.html  |  39 +++---
 content/blog/page7/index.html  |  38 --
 content/blog/page8/index.html  |  37 --
 content/blog/page9/index.html  |  39 +++---
 content/downloads.html |  29 +++--
 content/index.html |  10 +-
 .../11/release-1.8.3.html} | 131 +--
 content/zh/downloads.html  |  33 +++--
 content/zh/index.html  |  10 +-
 18 files changed, 670 insertions(+), 252 deletions(-)
 create mode 100644 _posts/2019-12-11-release-1.8.3.md
 copy content/news/2019/{07/02/release-1.8.1.html => 12/11/release-1.8.3.html} 
(60%)



[flink-web] 01/02: Add Apache Flink release 1.8.3

2019-12-11 Thread hequn
This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 493689f49300f3b68685ccaae6224f1614eaa041
Author: hequn8128 
AuthorDate: Wed Nov 27 12:59:26 2019 +0800

Add Apache Flink release 1.8.3
---
 _config.yml|  56 ---
 _posts/2019-12-11-release-1.8.3.md | 144 +
 2 files changed, 174 insertions(+), 26 deletions(-)

diff --git a/_config.yml b/_config.yml
index 1d880b8..bbf68d3 100644
--- a/_config.yml
+++ b/_config.yml
@@ -129,23 +129,23 @@ flink_releases:
   -
   version_short: 1.8
   binary_release:
-  name: "Apache Flink 1.8.2"
+  name: "Apache Flink 1.8.3"
   scala_211:
-  id: "182-download_211"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.11.tgz";
-  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.11.tgz.asc";
-  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.11.tgz.sha512";
+  id: "183-download_211"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.11.tgz";
+  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.11.tgz.asc";
+  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.11.tgz.sha512";
   scala_212:
-  id: "182-download_212"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.12.tgz";
-  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.12.tgz.asc";
-  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.12.tgz.sha512";
+  id: "183-download_212"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.12.tgz";
+  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.12.tgz.asc";
+  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.12.tgz.sha512";
   source_release:
-  name: "Apache Flink 1.8.2"
-  id: "182-download-source"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.2/flink-1.8.2-src.tgz";
-  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz.asc";
-  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz.sha512";
+  name: "Apache Flink 1.8.3"
+  id: "183-download-source"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.3/flink-1.8.3-src.tgz";
+  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-src.tgz.asc";
+  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-src.tgz.sha512";
   optional_components:
 -
   name: "Pre-bundled Hadoop 2.4.1"
@@ -183,26 +183,26 @@ flink_releases:
   name: "Avro SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 182-sql-format-avro
-  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.2/flink-avro-1.8.2.jar
-  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.2/flink-avro-1.8.2.jar.asc
-  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.2/flink-avro-1.8.2.jar.sha1
+  id: 183-sql-format-avro
+  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.3/flink-avro-1.8.3.jar
+  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.3/flink-avro-1.8.3.jar.asc
+  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.3/flink-avro-1.8.3.jar.sha1
 -
   name: "CSV SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 182-sql-format-csv
-  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.2/flink-csv-1.8.2.jar
-  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.2/flink-csv-1.8.2.jar.asc
-  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.2/flink-csv-1.8.2.jar.sha1
+  id: 183-sql-format-csv
+  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.3/flink-csv-1.8.3.jar
+  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.3/flink-csv-1.8.3.jar.asc
+  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.3/flink-csv-1.8.3.jar.sha1
 -
   name: "JSON SQL Format"
   category: "SQL Formats

[flink-web] 02/02: Rebuild website

2019-12-11 Thread hequn
This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit f78b71ee1e70d5350e8527602e08b5ae87bec053
Author: hequn8128 
AuthorDate: Thu Dec 12 14:03:22 2019 +0800

Rebuild website
---
 content/blog/feed.xml  | 140 +++
 content/blog/index.html|  38 ++-
 content/blog/page10/index.html |  25 ++
 content/blog/page2/index.html  |  38 +--
 content/blog/page3/index.html  |  40 ++--
 content/blog/page4/index.html  |  38 ++-
 content/blog/page5/index.html  |  37 +--
 content/blog/page6/index.html  |  39 +--
 content/blog/page7/index.html  |  38 ++-
 content/blog/page8/index.html  |  37 +--
 content/blog/page9/index.html  |  39 +--
 content/downloads.html |  29 ++-
 content/index.html |  10 +-
 content/news/2019/12/11/release-1.8.3.html | 371 +
 content/zh/downloads.html  |  33 ++-
 content/zh/index.html  |  10 +-
 16 files changed, 805 insertions(+), 157 deletions(-)

diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index 0a838e1..c044975 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,146 @@
 https://flink.apache.org/blog/feed.xml"; rel="self" 
type="application/rss+xml" />
 
 
+Apache Flink 1.8.3 Released
+

The Apache Flink community released the third bugfix version of the Apache Flink 1.8 series.

+ +

This release includes 45 fixes and minor improvements for Flink 1.8.2. The list below includes a detailed list of all fixes and improvements.

+ +

We highly recommend all users to upgrade to Flink 1.8.3.

+ +

Updated Maven dependencies:

+ +
<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-java</artifactId>
+  <version>1.8.3</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java_2.11</artifactId>
+  <version>1.8.3</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients_2.11</artifactId>
+  <version>1.8.3</version>
+</dependency>
+ +

You can find the binaries on the updated Downloads page.

+ +

List of resolved issues:

+ +

Sub-task +

+
    +
  • [FLINK-13723;] - Use liquid-c for faster doc generation +
  • +
  • [FLINK-13724;] - Remove unnecessary whitespace from the docs' sidenav +
  • +
  • [FLINK-13725;] - Use sassc for faster doc generation +
  • +
  • [FLINK-13726;] - Build docs with jekyll 4.0.0.pre.beta1 +
  • +
  • [FLINK-13791;] - Speed up sidenav by using group_by +
  • +
+ +

Bug +

+
    +
  • [FLINK-12342;] - Yarn Resource Manager Acquires Too Many Containers +
  • +
  • [FLINK-13184;] - Starting a TaskExecutor blocks the YarnResourceManager's main thread +
  • +
  • [FLINK-13728;] - Fix wrong closing tag order in sidenav +
  • +
  • [FLINK-13746;] - Elasticsearch (v2.3.5) sink end-to-end test fails on Travis +
  • +
  • [FLINK-13749;] - Make Flink client respect classloading policy +
  • +
  • [FLINK-13892;] - HistoryServerTest failed on Travis +

[flink] branch release-1.10 updated: [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API

2019-12-11 Thread hequn
This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 21e3c2f  [FLINK-14007][python][docs] Add documentation for how to use 
Java user-defined source/sink in Python API
21e3c2f is described below

commit 21e3c2f3380bc0fbdd54c074841f69acd4d5d40f
Author: hequn8128 
AuthorDate: Tue Dec 10 21:29:43 2019 +0800

[FLINK-14007][python][docs] Add documentation for how to use Java 
user-defined source/sink in Python API

This closes #10515.
---
 docs/dev/table/sourceSinks.md| 58 +++-
 docs/dev/table/sourceSinks.zh.md | 56 +-
 2 files changed, 76 insertions(+), 38 deletions(-)

diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index d0e4781..12fdb82 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -714,10 +714,11 @@ connector.debug=true
 
 For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the 
Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that 
translate into string-based properties. See the [built-in 
descriptors](connect.html) for sources, sinks, and formats as a reference.
 
-A connector for `MySystem` in our example can extend `ConnectorDescriptor` as 
shown below:
-
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight java %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import java.util.HashMap;
@@ -743,9 +744,25 @@ public class MySystemConnector extends ConnectorDescriptor 
{
   }
 }
 {% endhighlight %}
+
+The descriptor can then be used to create a table with the table environment.
+
+{% highlight java %}
+StreamTableEnvironment tableEnv = // ...
+
+tableEnv
+  .connect(new MySystemConnector(true))
+  .withSchema(...)
+  .inAppendMode()
+  .createTemporaryTable("MySystemTable");
+{% endhighlight %}
+
 
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight scala %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor
 import java.util.HashMap
@@ -763,25 +780,9 @@ class MySystemConnector(isDebug: Boolean) extends 
ConnectorDescriptor("my-system
   }
 }
 {% endhighlight %}
-
-
 
-The descriptor can then be used in the API as follows:
+The descriptor can then be used to create a table with the table environment.
 
-
-
-{% highlight java %}
-StreamTableEnvironment tableEnv = // ...
-
-tableEnv
-  .connect(new MySystemConnector(true))
-  .withSchema(...)
-  .inAppendMode()
-  .createTemporaryTable("MySystemTable");
-{% endhighlight %}
-
-
-
 {% highlight scala %}
 val tableEnv: StreamTableEnvironment = // ...
 
@@ -791,7 +792,26 @@ tableEnv
   .inAppendMode()
   .createTemporaryTable("MySystemTable")
 {% endhighlight %}
+
+
+
+
+
+You can use a Java `TableFactory` from Python using the 
`CustomConnectorDescriptor`.
+
+{% highlight python %}
+s_env = StreamExecutionEnvironment.get_execution_environment()
+st_env = StreamTableEnvironment.create(s_env)
+
+custom_connector = CustomConnectorDescriptor('my-system', 1, False)
+st_env\
+.connect(custom_connector.property("connector.debug", "true")) \
+.with_schema(...) \
+.in_append_mode()\
+.create_temporary_table("MySystemTable")
+{% endhighlight %}
 
+
 
 
 {% top %}
diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md
index d0e4781..e6531b0 100644
--- a/docs/dev/table/sourceSinks.zh.md
+++ b/docs/dev/table/sourceSinks.zh.md
@@ -714,10 +714,11 @@ connector.debug=true
 
 For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the 
Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that 
translate into string-based properties. See the [built-in 
descriptors](connect.html) for sources, sinks, and formats as a reference.
 
-A connector for `MySystem` in our example can extend `ConnectorDescriptor` as 
shown below:
-
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight java %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import java.util.HashMap;
@@ -743,9 +744,24 @@ public class MySystemConnector extends ConnectorDescriptor 
{
   }
 }
 {% endhighlight %}
+
+The descriptor can then be used to create a table with the table environment.
+
+{% highlight java %}
+StreamTableEnvironment tableEnv = // ...
+
+tableEnv
+  .connect(new MySystemConnector(true))
+  .withSchema(...)
+  .inAppendMode()
+  .createTemporaryTable("MySystemTable");
+{% endhighlight %}
 
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight scala %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor
 import java.util.HashMap
@@ -763,25 +779,9 @@ class MySystemConne

[flink] branch master updated: [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API

2019-12-11 Thread hequn
This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 98dd5dc  [FLINK-14007][python][docs] Add documentation for how to use 
Java user-defined source/sink in Python API
98dd5dc is described below

commit 98dd5dc4cf3b31ff7b93bf880d982306dab27845
Author: hequn8128 
AuthorDate: Tue Dec 10 21:29:43 2019 +0800

[FLINK-14007][python][docs] Add documentation for how to use Java 
user-defined source/sink in Python API

This closes #10515.
---
 docs/dev/table/sourceSinks.md| 58 +++-
 docs/dev/table/sourceSinks.zh.md | 56 +-
 2 files changed, 76 insertions(+), 38 deletions(-)

diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index d0e4781..12fdb82 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -714,10 +714,11 @@ connector.debug=true
 
 For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the 
Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that 
translate into string-based properties. See the [built-in 
descriptors](connect.html) for sources, sinks, and formats as a reference.
 
-A connector for `MySystem` in our example can extend `ConnectorDescriptor` as 
shown below:
-
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight java %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import java.util.HashMap;
@@ -743,9 +744,25 @@ public class MySystemConnector extends ConnectorDescriptor 
{
   }
 }
 {% endhighlight %}
+
+The descriptor can then be used to create a table with the table environment.
+
+{% highlight java %}
+StreamTableEnvironment tableEnv = // ...
+
+tableEnv
+  .connect(new MySystemConnector(true))
+  .withSchema(...)
+  .inAppendMode()
+  .createTemporaryTable("MySystemTable");
+{% endhighlight %}
+
 
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight scala %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor
 import java.util.HashMap
@@ -763,25 +780,9 @@ class MySystemConnector(isDebug: Boolean) extends 
ConnectorDescriptor("my-system
   }
 }
 {% endhighlight %}
-
-
 
-The descriptor can then be used in the API as follows:
+The descriptor can then be used to create a table with the table environment.
 
-
-
-{% highlight java %}
-StreamTableEnvironment tableEnv = // ...
-
-tableEnv
-  .connect(new MySystemConnector(true))
-  .withSchema(...)
-  .inAppendMode()
-  .createTemporaryTable("MySystemTable");
-{% endhighlight %}
-
-
-
 {% highlight scala %}
 val tableEnv: StreamTableEnvironment = // ...
 
@@ -791,7 +792,26 @@ tableEnv
   .inAppendMode()
   .createTemporaryTable("MySystemTable")
 {% endhighlight %}
+
+
+
+
+
+You can use a Java `TableFactory` from Python using the 
`CustomConnectorDescriptor`.
+
+{% highlight python %}
+s_env = StreamExecutionEnvironment.get_execution_environment()
+st_env = StreamTableEnvironment.create(s_env)
+
+custom_connector = CustomConnectorDescriptor('my-system', 1, False)
+st_env\
+.connect(custom_connector.property("connector.debug", "true")) \
+.with_schema(...) \
+.in_append_mode()\
+.create_temporary_table("MySystemTable")
+{% endhighlight %}
 
+
 
 
 {% top %}
diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md
index d0e4781..e6531b0 100644
--- a/docs/dev/table/sourceSinks.zh.md
+++ b/docs/dev/table/sourceSinks.zh.md
@@ -714,10 +714,11 @@ connector.debug=true
 
 For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the 
Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that 
translate into string-based properties. See the [built-in 
descriptors](connect.html) for sources, sinks, and formats as a reference.
 
-A connector for `MySystem` in our example can extend `ConnectorDescriptor` as 
shown below:
-
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight java %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import java.util.HashMap;
@@ -743,9 +744,24 @@ public class MySystemConnector extends ConnectorDescriptor 
{
   }
 }
 {% endhighlight %}
+
+The descriptor can then be used to create a table with the table environment.
+
+{% highlight java %}
+StreamTableEnvironment tableEnv = // ...
+
+tableEnv
+  .connect(new MySystemConnector(true))
+  .withSchema(...)
+  .inAppendMode()
+  .createTemporaryTable("MySystemTable");
+{% endhighlight %}
 
 
 
+
+A custom descriptor can be defined by extending the `ConnectorDescriptor` 
class.
+
 {% highlight scala %}
 import org.apache.flink.table.descriptors.ConnectorDescriptor
 import java.util.HashMap
@@ -763,25 +779,9 @@ class MySystemConnector(isDebug

[flink] branch release-1.10 updated: [FLINK-15061][table]create/alter and table/databases properties should be case sensitive stored in catalog

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new c35ae05  [FLINK-15061][table]create/alter and table/databases 
properties should be case sensitive stored in catalog
c35ae05 is described below

commit c35ae053942860d7bb4677124cb21821db968954
Author: zjuwangg 
AuthorDate: Mon Dec 9 15:28:39 2019 +0800

[FLINK-15061][table]create/alter and table/databases properties should be 
case sensitive stored in catalog

this closes #10493.
---
 .../operations/SqlToOperationConverter.java| 11 +++
 .../operations/SqlToOperationConverterTest.java| 35 +++---
 .../table/sqlexec/SqlToOperationConverter.java | 12 +++-
 .../table/sqlexec/SqlToOperationConverterTest.java | 35 +++---
 4 files changed, 56 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index fab5fee..bcd3f34 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -172,8 +172,7 @@ public class SqlToOperationConverter {
// set with properties
Map properties = new HashMap<>();
sqlCreateTable.getPropertyList().getList().forEach(p ->
-   properties.put(((SqlTableOption) 
p).getKeyString().toLowerCase(),
-   ((SqlTableOption) p).getValueString()));
+   properties.put(((SqlTableOption) p).getKeyString(), 
((SqlTableOption) p).getValueString()));
 
TableSchema tableSchema = createTableSchema(sqlCreateTable);
String tableComment = sqlCreateTable.getComment().map(comment ->
@@ -223,8 +222,7 @@ public class SqlToOperationConverter {
Map properties = new 
HashMap<>();

properties.putAll(originalCatalogTable.getProperties());
((SqlAlterTableProperties) 
sqlAlterTable).getPropertyList().getList().forEach(p ->
-   properties.put(((SqlTableOption) 
p).getKeyString().toLowerCase(),
-   ((SqlTableOption) 
p).getValueString()));
+   properties.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
CatalogTable catalogTable = new 
CatalogTableImpl(
originalCatalogTable.getSchema(),
originalCatalogTable.getPartitionKeys(),
@@ -386,8 +384,7 @@ public class SqlToOperationConverter {
// set with properties
Map properties = new HashMap<>();
sqlCreateDatabase.getPropertyList().getList().forEach(p ->
-   properties.put(((SqlTableOption) 
p).getKeyString().toLowerCase(),
-   ((SqlTableOption) p).getValueString()));
+   properties.put(((SqlTableOption) p).getKeyString(), 
((SqlTableOption) p).getValueString()));
CatalogDatabase catalogDatabase = new 
CatalogDatabaseImpl(properties, databaseComment);
return new CreateDatabaseOperation(catalogName, databaseName, 
catalogDatabase, ignoreIfExists);
}
@@ -430,7 +427,7 @@ public class SqlToOperationConverter {
}
// set with properties
sqlAlterDatabase.getPropertyList().getList().forEach(p ->
-   properties.put(((SqlTableOption) 
p).getKeyString().toLowerCase(), ((SqlTableOption) p).getValueString()));
+   properties.put(((SqlTableOption) p).getKeyString(), 
((SqlTableOption) p).getValueString()));
CatalogDatabase catalogDatabase = new 
CatalogDatabaseImpl(properties, originCatalogDatabase.getComment());
return new AlterDatabaseOperation(catalogName, databaseName, 
catalogDatabase);
}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 61239fe..c5bdef7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-bl

[flink] branch master updated (5b09f19 -> ab4c31c)

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5b09f19  [FLINK-13577][ml] Add an util class to build result row and 
generate result schema.
 add ab4c31c  [FLINK-15061][table]create/alter and table/databases 
properties should be case sensitive stored in catalog

No new revisions were added by this update.

Summary of changes:
 .../operations/SqlToOperationConverter.java| 11 +++
 .../operations/SqlToOperationConverterTest.java| 35 +++---
 .../table/sqlexec/SqlToOperationConverter.java | 12 +++-
 .../table/sqlexec/SqlToOperationConverterTest.java | 35 +++---
 4 files changed, 56 insertions(+), 37 deletions(-)



[flink] branch master updated (bdb4de7 -> 5b09f19)

2019-12-11 Thread rongr
This is an automated email from the ASF dual-hosted git repository.

rongr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from bdb4de7  [hotfix][doc] update catalog APIs
 add 5b09f19  [FLINK-13577][ml] Add an util class to build result row and 
generate result schema.

No new revisions were added by this update.

Summary of changes:
 .../flink/ml/common/utils/OutputColsHelper.java| 202 
 .../ml/common/utils/OutputColsHelperTest.java  | 263 +
 2 files changed, 465 insertions(+)
 create mode 100644 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java
 create mode 100644 
flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/utils/OutputColsHelperTest.java



[flink] branch release-1.10 updated: [hotfix][doc] update catalog APIs

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new a3d708a  [hotfix][doc] update catalog APIs
a3d708a is described below

commit a3d708a679123f7ad4db91c2a8c68769ad430baa
Author: bowen.li 
AuthorDate: Wed Dec 11 12:01:41 2019 -0800

[hotfix][doc] update catalog APIs

this closes #10525.
---
 docs/dev/table/catalogs.md| 245 ++
 docs/dev/table/catalogs.zh.md | 245 ++
 2 files changed, 490 insertions(+)

diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md
index cfd34b5..3cda99c 100644
--- a/docs/dev/table/catalogs.md
+++ b/docs/dev/table/catalogs.md
@@ -54,8 +54,253 @@ To use custom catalogs in SQL CLI, users should develop 
both a catalog and its c
 The catalog factory defines a set of properties for configuring the catalog 
when the SQL CLI bootstraps.
 The set of properties will be passed to a discovery service where the service 
tries to match the properties to a `CatalogFactory` and initiate a 
corresponding catalog instance.
 
+## How to Create and Register Flink Tables to Catalog
+
+### Using SQL DDL
+
+Users can use SQL DDL to create tables in catalogs in both Table API and SQL.
+
+For Table API:
+
+
+
+{% highlight java %}
+TableEnvironment tableEnv = ...
+
+// Create a HiveCatalog 
+Catalog catalog = new HiveCatalog("myhive", null, "", 
"");
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog);
+
+// Create a catalog database
+tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");
+
+// Create a catalog table
+tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+
+tableEnv.sqlQuery("SHOW TABLES"); // should see the table
+
+{% endhighlight %}
+
+
+
+For SQL Client:
+
+{% highlight sql %}
+// the catalog should have been registered via yaml file
+Flink SQL> CREATE DATABASE mydb WITH (...);
+
+Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
+
+Flink SQL> SHOW TABLES;
+mytable
+{% endhighlight %}
+
+For detailed information, please check out [Flink SQL DDL](({{ site.baseurl 
}}/dev/table/sql.html#create-table)).
+
+### Using Java/Scala/Python API
+
+Users can use Java, Scala, or Python API to create catalog tables 
programmatically.
+
+
+
+{% highlight java %}
+TableEnvironment tableEnv = ...
+
+// Create a HiveCatalog 
+Catalog catalog = new HiveCatalog("myhive", null, "", 
"");
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog);
+
+// Create a catalog database 
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+
+// Create a catalog table
+TableSchema schema = TableSchema.builder()
+.field("name", DataTypes.STRING())
+.field("age", DataTypes.INT())
+.build();
+
+catalog.createTable(
+new ObjectPath("mydb", "mytable"), 
+new CatalogTableImpl(
+schema,
+new Kafka()
+.version("0.11")
+
+.startFromEarlist(),
+"my comment"
+)
+);
+
+List tables = catalog.listTables("mydb); // tables should contain 
"mytable"
+
+
+
+
 ## Catalog API
 
+Note: only catalog program APIs are listed here. Users can achieve many of the 
same funtionalities with SQL DDL. 
+For detailed DDL information, please refer to [SQL 
DDL](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#ddl).
+
+
+### Database operations
+
+
+
+{% highlight java %}
+// create database
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
+
+// drop database
+catalog.dropDatabase("mydb", false);
+
+// alter database
+catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
+
+// get databse
+catalog.getDatabase("mydb");
+
+// check if a database exist
+catalog.databaseExists("mydb");
+
+// list databases in a catalog
+catalog.listDatabases("mycatalog");
+{% endhighlight %}
+
+
+
+### Table operations
+
+
+
+{% highlight java %}
+// create table
+catalog.createTable(new ObjectPath("mydb", "mytable"), new 
CatalogTableImpl(...), false);
+
+// drop table
+catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
+
+// alter table
+catalog.alterTable(new ObjectPath("mydb", "mytable"), new 
CatalogTableImpl(...), false);
+
+// rename table
+catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
+
+// get table
+catalog.getTable("mytable");
+
+// check if a table exist or not
+catalog.tableExists("mytable");
+
+// list tables in a database
+catalog.listTables("mydb");
+{% endhighlight %}
+
+
+
+### View operations
+
+
+
+{% highlight java %}
+// create view
+catalog.createTable(new ObjectPath("mydb", "myview"), new 
CatalogViewImpl(...), false);
+
+// drop view
+catalog.dropTable(new ObjectPath("mydb", "myview"), false);
+
+// alter view
+catalog.al

[flink] branch master updated (e40e158 -> bdb4de7)

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e40e158  [FLINK-15203][doc] rephrase Hive's data types doc
 add bdb4de7  [hotfix][doc] update catalog APIs

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/catalogs.md| 245 ++
 docs/dev/table/catalogs.zh.md | 245 ++
 2 files changed, 490 insertions(+)



[flink] branch master updated (26de396 -> e40e158)

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 26de396  [hotfix][doc] update Hive doc
 add e40e158  [FLINK-15203][doc] rephrase Hive's data types doc

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/hive/index.md| 23 +--
 docs/dev/table/hive/index.zh.md | 19 +--
 2 files changed, 22 insertions(+), 20 deletions(-)



[flink] branch release-1.10 updated: [FLINK-15203][doc] rephrase Hive's data types doc

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new df54cd1  [FLINK-15203][doc] rephrase Hive's data types doc
df54cd1 is described below

commit df54cd10c4f18904366833132bdeaa33427e8ba4
Author: bowen.li 
AuthorDate: Wed Dec 11 12:37:26 2019 -0800

[FLINK-15203][doc] rephrase Hive's data types doc

this closes #10537.
---
 docs/dev/table/hive/index.md| 23 +--
 docs/dev/table/hive/index.zh.md | 19 +--
 2 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md
index 9a83882..e242ccf 100644
--- a/docs/dev/table/hive/index.md
+++ b/docs/dev/table/hive/index.md
@@ -272,6 +272,10 @@ Currently `HiveCatalog` supports most Flink data types 
with the following mappin
 DATE
 
 
+TIMESTAMP
+TIMESTAMP
+
+
 BYTES
 BINARY
 
@@ -290,15 +294,14 @@ Currently `HiveCatalog` supports most Flink data types 
with the following mappin
   
 
 
-### Limitations
-
-The following limitations in Hive's data types impact the mapping between 
Flink and Hive:
 
-* `CHAR(p)` has a maximum length of 255
-* `VARCHAR(p)` has a maximum length of 65535
+* Hive's `CHAR(p)` has a maximum length of 255
+* Hive's `VARCHAR(p)` has a maximum length of 65535
 * Hive's `MAP` only supports primitive key types while Flink's `MAP` can be 
any data type
-* Hive's `UNION` type is not supported
-* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type
-* Flink's `TIMESTAMP_WITH_TIME_ZONE` and `TIMESTAMP_WITH_LOCAL_TIME_ZONE` are 
not supported by Hive
-* Flink's `TIMESTAMP_WITHOUT_TIME_ZONE` type cannot be mapped to Hive's 
`TIMESTAMP` type due to precision difference.
-* Flink's `MULTISET` is not supported by Hive
+
+
+Note that:
+
+* Flink doesn't support Hive's `UNION` type is not supported
+* Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET`
+* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md
index 1136ffe..708e228 100644
--- a/docs/dev/table/hive/index.zh.md
+++ b/docs/dev/table/hive/index.zh.md
@@ -290,15 +290,14 @@ Currently `HiveCatalog` supports most Flink data types 
with the following mappin
   
 
 
-### Limitations
 
-The following limitations in Hive's data types impact the mapping between 
Flink and Hive:
-
-* `CHAR(p)` has a maximum length of 255
-* `VARCHAR(p)` has a maximum length of 65535
+* Hive's `CHAR(p)` has a maximum length of 255
+* Hive's `VARCHAR(p)` has a maximum length of 65535
 * Hive's `MAP` only supports primitive key types while Flink's `MAP` can be 
any data type
-* Hive's `UNION` type is not supported
-* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type
-* Flink's `TIMESTAMP_WITH_TIME_ZONE` and `TIMESTAMP_WITH_LOCAL_TIME_ZONE` are 
not supported by Hive
-* Flink's `TIMESTAMP_WITHOUT_TIME_ZONE` type cannot be mapped to Hive's 
`TIMESTAMP` type due to precision difference.
-* Flink's `MULTISET` is not supported by Hive
+
+
+Note that:
+
+* Flink doesn't support Hive's `UNION` type is not supported
+* Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, 
`TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET`
+* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet



[flink] branch master updated (940613b -> 26de396)

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 940613b  [hotfix][doc] update and remove limitations of Hive connector
 add 26de396  [hotfix][doc] update Hive doc

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/hive/index.md| 17 -
 docs/dev/table/hive/index.zh.md | 17 -
 2 files changed, 24 insertions(+), 10 deletions(-)



[flink] branch release-1.10 updated: [hotfix][doc] update and remove limitations of Hive connector

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 393bf85  [hotfix][doc] update and remove limitations of Hive connector
393bf85 is described below

commit 393bf85ba6e8ebaeb94f83627dd2410d0215a7aa
Author: bowen.li 
AuthorDate: Wed Dec 11 12:51:05 2019 -0800

[hotfix][doc] update and remove limitations of Hive connector
---
 docs/dev/table/hive/read_write_hive.md| 21 +++--
 docs/dev/table/hive/read_write_hive.zh.md | 21 +++--
 2 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/docs/dev/table/hive/read_write_hive.md 
b/docs/dev/table/hive/read_write_hive.md
index 4a48060..9814ceb 100644
--- a/docs/dev/table/hive/read_write_hive.md
+++ b/docs/dev/table/hive/read_write_hive.md
@@ -24,7 +24,6 @@ under the License.
 
 Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and 
write from Hive data as an alternative to Hive's batch engine. Be sure to 
follow the instructions to include the correct [dependencies]({{ site.baseurl 
}}/dev/table/hive/#depedencies) in your application.  
 
-
 * This will be replaced by the TOC
 {:toc}
 
@@ -118,14 +117,16 @@ Similarly, data can be written into hive using an `INSERT 
INTO` clause.
 Flink SQL> INSERT INTO mytable (name, value) VALUES ('Tom', 4.72);
 {% endhighlight %}
 
-### Limitations
+## Formats
+
+We have tested on the following of table storage formats: text, csv, 
SequenceFile, ORC, and Parquet.
+
+## Roadmap
+
+We are planning and actively working on supporting features like
 
-The following is a list of major limitations of the Hive connector. And we're 
actively working to close these gaps.
+- ACID tables
+- bucketed tables
+- more formats
 
-1. INSERT OVERWRITE is not supported.
-2. Inserting into partitioned tables is not supported.
-3. ACID tables are not supported.
-4. Bucketed tables are not supported.
-5. Some data types are not supported. See the [limitations]({{ site.baseurl 
}}/dev/table/hive/#limitations) for details.
-6. Only a limited number of table storage formats have been tested, namely 
text, SequenceFile, ORC, and Parquet.
-7. Views are not supported.
+Please reach out to the community for more feature request 
https://flink.apache.org/community.html#mailing-lists
diff --git a/docs/dev/table/hive/read_write_hive.zh.md 
b/docs/dev/table/hive/read_write_hive.zh.md
index 9b9bf77..9814ceb 100644
--- a/docs/dev/table/hive/read_write_hive.zh.md
+++ b/docs/dev/table/hive/read_write_hive.zh.md
@@ -24,7 +24,6 @@ under the License.
 
 Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and 
write from Hive data as an alternative to Hive's batch engine. Be sure to 
follow the instructions to include the correct [dependencies]({{ site.baseurl 
}}/dev/table/hive/#depedencies) in your application.  
 
-
 * This will be replaced by the TOC
 {:toc}
 
@@ -118,14 +117,16 @@ Similarly, data can be written into hive using an `INSERT 
INTO` clause.
 Flink SQL> INSERT INTO mytable (name, value) VALUES ('Tom', 4.72);
 {% endhighlight %}
 
-### Limitations
+## Formats
+
+We have tested on the following of table storage formats: text, csv, 
SequenceFile, ORC, and Parquet.
+
+## Roadmap
+
+We are planning and actively working on supporting features like
 
-The following is a list of major limitations of the Hive connector. And we're 
actively working to close these gaps.
+- ACID tables
+- bucketed tables
+- more formats
 
-1. INSERT OVERWRITE is not supported.
-2. Inserting into partitioned tables is not supported.
-3. ACID tables are not supported.
-4. Bucketed tables are not supported.
-5. Some data types are not supported. See the [limitations]({{ site.baseurl 
}}/zh/dev/table/hive/#limitations) for details.
-6. Only a limited number of table storage formats have been tested, namely 
text, SequenceFile, ORC, and Parquet.
-7. Views are not supported.
+Please reach out to the community for more feature request 
https://flink.apache.org/community.html#mailing-lists



[flink] branch master updated (9277e9e -> 940613b)

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9277e9e  [FLINK-15199][rocksdb] Make RocksDBResourceContainer public 
for the benchmarks
 add 940613b  [hotfix][doc] update and remove limitations of Hive connector

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/hive/read_write_hive.md| 21 +++--
 docs/dev/table/hive/read_write_hive.zh.md | 21 +++--
 2 files changed, 22 insertions(+), 20 deletions(-)



[flink] branch release-1.10 updated: [hotfix][doc] update Hive doc

2019-12-11 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 26b8002  [hotfix][doc] update Hive doc
26b8002 is described below

commit 26b8002c0a90d9cb452a2700f58b221a3f0c05c4
Author: bowen.li 
AuthorDate: Tue Dec 10 20:03:46 2019 -0800

[hotfix][doc] update Hive doc
---
 docs/dev/table/hive/index.md| 17 -
 docs/dev/table/hive/index.zh.md | 17 -
 2 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md
index cc0c716..9a83882 100644
--- a/docs/dev/table/hive/index.md
+++ b/docs/dev/table/hive/index.md
@@ -75,7 +75,7 @@ Flink supports the following Hive versions.
 
 ### Dependencies
 
-To integrate with Hive, users need the following dependencies in their project.
+To integrate with Hive, users need some dependencies in their project. We are 
using Hive 2.3.4 and 1.2.1 as examples here.
 
 
 
@@ -96,7 +96,9 @@ To integrate with Hive, users need the following dependencies 
in their project.
   provided
 
 
-
+
 
 
   org.apache.flink
@@ -132,7 +134,9 @@ To integrate with Hive, users need the following 
dependencies in their project.
   provided
 
 
-
+
 
 
   org.apache.flink
@@ -167,13 +171,16 @@ To integrate with Hive, users need the following 
dependencies in their project.
 
 Connect to an existing Hive installation using the Hive [Catalog]({{ 
site.baseurl }}/dev/table/catalogs.html) through the table environment or YAML 
configuration.
 
+If the `hive-conf/hive-site.xml` file is stored in remote storage system, 
users should download 
+the hive configuration file to their local environment first. 
+
 
 
 {% highlight java %}
 
 String name= "myhive";
 String defaultDatabase = "mydatabase";
-String hiveConfDir = "/opt/hive-conf";
+String hiveConfDir = "/opt/hive-conf"; // a local path
 String version = "2.3.4"; // or 1.2.1
 
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
@@ -185,7 +192,7 @@ tableEnv.registerCatalog("myhive", hive);
 
 val name= "myhive"
 val defaultDatabase = "mydatabase"
-val hiveConfDir = "/opt/hive-conf"
+val hiveConfDir = "/opt/hive-conf" // a local path
 val version = "2.3.4" // or 1.2.1
 
 val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md
index 205ecfe..1136ffe 100644
--- a/docs/dev/table/hive/index.zh.md
+++ b/docs/dev/table/hive/index.zh.md
@@ -75,7 +75,7 @@ Flink supports the following Hive versions.
 
 ### Dependencies
 
-To integrate with Hive, users need the following dependencies in their project.
+To integrate with Hive, users need some dependencies in their project. We are 
using Hive 2.3.4 and 1.2.1 as examples here.
 
 
 
@@ -96,7 +96,9 @@ To integrate with Hive, users need the following dependencies 
in their project.
   provided
 
 
-
+
 
 
   org.apache.flink
@@ -132,7 +134,9 @@ To integrate with Hive, users need the following 
dependencies in their project.
   provided
 
 
-
+
 
 
   org.apache.flink
@@ -167,13 +171,16 @@ To integrate with Hive, users need the following 
dependencies in their project.
 
 Connect to an existing Hive installation using the Hive [Catalog]({{ 
site.baseurl }}/dev/table/catalogs.html) through the table environment or YAML 
configuration.
 
+If the `hive-conf/hive-site.xml` file is stored in remote storage system, 
users should download 
+the hive configuration file to their local environment first. 
+
 
 
 {% highlight java %}
 
 String name= "myhive";
 String defaultDatabase = "mydatabase";
-String hiveConfDir = "/opt/hive-conf";
+String hiveConfDir = "/opt/hive-conf"; // a local path
 String version = "2.3.4"; // or 1.2.1
 
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
@@ -185,7 +192,7 @@ tableEnv.registerCatalog("myhive", hive);
 
 val name= "myhive"
 val defaultDatabase = "mydatabase"
-val hiveConfDir = "/opt/hive-conf"
+val hiveConfDir = "/opt/hive-conf" // a local path
 val version = "2.3.4" // or 1.2.1
 
 val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)



[flink] branch master updated (04ab225 -> 9277e9e)

2019-12-11 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 04ab225  [FLINK-15069][benchmark] Supplement the pipelined shuffle 
compression case for benchmark
 add 9277e9e  [FLINK-15199][rocksdb] Make RocksDBResourceContainer public 
for the benchmarks

No new revisions were added by this update.

Summary of changes:
 .../flink/contrib/streaming/state/RocksDBResourceContainer.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[flink] branch master updated (07f08fb -> 04ab225)

2019-12-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 07f08fb  [FLINK-14926][state-backend-rocksdb] (follow-up) Replace 
OptionsFactory for RocksDBOptionsFactory
 add 04ab225  [FLINK-15069][benchmark] Supplement the pipelined shuffle 
compression case for benchmark

No new revisions were added by this update.

Summary of changes:
 ...a => StreamNetworkCompressionThroughputBenchmark.java} | 15 ++-
 ... StreamNetworkCompressionThroughputBenchmarkTest.java} |  6 +++---
 2 files changed, 13 insertions(+), 8 deletions(-)
 copy 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/{StreamNetworkBroadcastThroughputBenchmark.java
 => StreamNetworkCompressionThroughputBenchmark.java} (74%)
 copy 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/{StreamNetworkBroadcastThroughputBenchmarkTest.java
 => StreamNetworkCompressionThroughputBenchmarkTest.java} (83%)



[flink] 03/04: [hotfix][kubernetes] Separate creating and retrieving flink cluster logs

2019-12-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d61ef1c3f02068d8fb7a23dd4d71db643c0491f1
Author: wangyang0918 
AuthorDate: Mon Dec 9 20:40:46 2019 +0800

[hotfix][kubernetes] Separate creating and retrieving flink cluster logs

Currently, when we create a flink cluster on kubernetes, two retrieving 
logs and one creating log will show up. It is so confusing. The logs of 
creating and retrieving a flink cluster should be separated.
---
 .../kubernetes/KubernetesClusterDescriptor.java| 40 ++
 1 file changed, 19 insertions(+), 21 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index ff04a59..9d11954 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -91,14 +91,7 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor {
}
 
try {
-
-   RestClusterClient resultClient = new 
RestClusterClient<>(
-   configuration, clusterId);
-   LOG.info(
-   "Succesfully retrieved cluster 
client for cluster {}, JobManager Web Interface : {}",
-   clusterId,
-   
resultClient.getWebInterfaceURL());
-   return resultClient;
+   return new RestClusterClient<>(configuration, 
clusterId);
} catch (Exception e) {
client.handleException(e);
throw new RuntimeException(new 
ClusterRetrieveException("Could not create the RestClusterClient.", e));
@@ -108,17 +101,31 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor {
 
@Override
public ClusterClientProvider retrieve(String clusterId) {
-   return createClusterClientProvider(clusterId);
+   final ClusterClientProvider clusterClientProvider = 
createClusterClientProvider(clusterId);
+
+   try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
+   LOG.info(
+   "Retrieve flink cluster {} successfully, 
JobManager Web Interface: {}",
+   clusterId,
+   clusterClient.getWebInterfaceURL());
+   }
+   return clusterClientProvider;
}
 
@Override
public ClusterClientProvider 
deploySessionCluster(ClusterSpecification clusterSpecification) throws 
ClusterDeploymentException {
-   final ClusterClientProvider clusterClient = 
deployClusterInternal(
+   final ClusterClientProvider clusterClientProvider = 
deployClusterInternal(
KubernetesSessionClusterEntrypoint.class.getName(),
clusterSpecification,
false);
 
-   return clusterClient;
+   try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
+   LOG.info(
+   "Create flink session cluster {} successfully, 
JobManager Web Interface: {}",
+   clusterId,
+   clusterClient.getWebInterfaceURL());
+   }
+   return clusterClientProvider;
}
 
@Override
@@ -180,16 +187,7 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor {
client.createConfigMap();

client.createFlinkMasterDeployment(clusterSpecification);
 
-   ClusterClientProvider clusterClientProvider = 
createClusterClientProvider(clusterId);
-
-   try (ClusterClient clusterClient = 
clusterClientProvider.getClusterClient()) {
-   LOG.info(
-   "Create flink session cluster 
{} successfully, JobManager Web Interface: {}",
-   clusterId,
-   
clusterClient.getWebInterfaceURL());
-   }
-
-   return clusterClientProvider;
+   return createClusterClientProvider(clusterId);
} catch (Exception e) {
client.handleException(e);
throw new ClusterDeploymentException("Coul

[flink] 01/04: [hotfix][kubernetes] Remove per-job related config options

2019-12-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef4ad4491980a59a1722af9ba61bd09f4dbdfd53
Author: wangyang0918 
AuthorDate: Mon Dec 9 18:06:33 2019 +0800

[hotfix][kubernetes] Remove per-job related config options
---
 .../kubernetes/cli/FlinkKubernetesCustomCli.java   | 28 --
 .../flink/kubernetes/cli/KubernetesCliOptions.java | 16 -
 2 files changed, 44 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
index 620abb1..9176e9a 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
@@ -57,8 +56,6 @@ import static 
org.apache.flink.kubernetes.cli.KubernetesCliOptions.CLUSTER_ID_OP
 import static 
org.apache.flink.kubernetes.cli.KubernetesCliOptions.DYNAMIC_PROPERTY_OPTION;
 import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.HELP_OPTION;
 import static 
org.apache.flink.kubernetes.cli.KubernetesCliOptions.IMAGE_OPTION;
-import static 
org.apache.flink.kubernetes.cli.KubernetesCliOptions.JOB_CLASS_NAME_OPTION;
-import static 
org.apache.flink.kubernetes.cli.KubernetesCliOptions.JOB_ID_OPTION;
 
 /**
  * Kubernetes customized commandline.
@@ -85,9 +82,6 @@ public class FlinkKubernetesCustomCli extends 
AbstractCustomCommandLine {
private final Option dynamicPropertiesOption;
private final Option helpOption;
 
-   private final Option jobClassOption;
-   private final Option jobIdOption;
-
private final ClusterClientServiceLoader clusterClientServiceLoader;
 
public FlinkKubernetesCustomCli(Configuration configuration, String 
shortPrefix, String longPrefix) {
@@ -115,9 +109,6 @@ public class FlinkKubernetesCustomCli extends 
AbstractCustomCommandLine {
this.taskManagerSlotsOption = 
KubernetesCliOptions.getOptionWithPrefix(
KubernetesCliOptions.TASK_MANAGER_SLOTS_OPTION, 
shortPrefix, longPrefix);
 
-   this.jobClassOption = 
KubernetesCliOptions.getOptionWithPrefix(JOB_CLASS_NAME_OPTION, shortPrefix, 
longPrefix);
-   this.jobIdOption = 
KubernetesCliOptions.getOptionWithPrefix(JOB_ID_OPTION, shortPrefix, 
longPrefix);
-
this.helpOption = 
KubernetesCliOptions.getOptionWithPrefix(HELP_OPTION, shortPrefix, longPrefix);
}
 
@@ -197,25 +188,6 @@ public class FlinkKubernetesCustomCli extends 
AbstractCustomCommandLine {
effectiveConfiguration.setString(key, 
dynamicProperties.getProperty(key));
}
 
-   final StringBuilder entryPointClassArgs = new StringBuilder();
-   if (commandLine.hasOption(jobClassOption.getOpt())) {
-   entryPointClassArgs.append(" --")
-   .append(jobClassOption.getLongOpt())
-   .append(" ")
-   
.append(commandLine.getOptionValue(jobClassOption.getOpt()));
-   }
-
-   if (commandLine.hasOption(jobIdOption.getOpt())) {
-   entryPointClassArgs.append(" --")
-   .append(jobIdOption.getLongOpt())
-   .append(" ")
-   
.append(commandLine.getOptionValue(jobIdOption.getOpt()));
-   }
-   if (!entryPointClassArgs.toString().isEmpty()) {
-   
effectiveConfiguration.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS_ARGS,
-   entryPointClassArgs.toString());
-   }
-
return effectiveConfiguration;
}
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
index 48153bc..b71eb00 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
@@ -91,22 +91,6 

[flink] 04/04: [FLINK-15153][kubernetes] Service selector needs to contain jobmanager component label

2019-12-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f7ad848964c00bee490be0a4ea6d715426179d8
Author: wangyang0918 
AuthorDate: Mon Dec 9 18:08:03 2019 +0800

[FLINK-15153][kubernetes] Service selector needs to contain jobmanager 
component label

The jobmanager label needs to be added to service selector. Otherwise, it 
may select the wrong backend pods(taskmanager).
---
 .../flink/kubernetes/kubeclient/decorators/ServiceDecorator.java | 9 -
 .../apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java| 3 +++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java
index 98c4634..f8c4493 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java
@@ -33,6 +33,7 @@ import io.fabric8.kubernetes.api.model.ServiceSpec;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Setup services port.
@@ -89,7 +90,13 @@ public class ServiceDecorator extends Decorator {
}
 
spec.setPorts(servicePorts);
-   spec.setSelector(resource.getMetadata().getLabels());
+
+   final Map labels = new LabelBuilder()
+   .withExist(resource.getMetadata().getLabels())
+   .withJobManagerComponent()
+   .toLabels();
+
+   spec.setSelector(labels);
 
resource.setSpec(spec);
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
index 17b52c7..08ebedf 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
@@ -106,6 +106,8 @@ public class Fabric8ClientTest extends KubernetesTestBase {
 

assertEquals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString(), 
service.getSpec().getType());
 
+   // The selector labels should contain jobmanager component
+   labels.put(Constants.LABEL_COMPONENT_KEY, 
Constants.LABEL_COMPONENT_JOB_MANAGER);
assertEquals(labels, service.getSpec().getSelector());
 

assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),
@@ -133,6 +135,7 @@ public class Fabric8ClientTest extends KubernetesTestBase {
 

assertEquals(KubernetesConfigOptions.ServiceExposedType.LoadBalancer.toString(),
 service.getSpec().getType());
 
+   labels.put(Constants.LABEL_COMPONENT_KEY, 
Constants.LABEL_COMPONENT_JOB_MANAGER);
assertEquals(labels, service.getSpec().getSelector());
 

assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),



[flink] 02/04: [hotfix][kubernetes] Correct the description of cli options

2019-12-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c1651914a562eb58ad92bc7931d9110ae4b7b05f
Author: wangyang0918 
AuthorDate: Tue Dec 10 14:39:02 2019 +0800

[hotfix][kubernetes] Correct the description of cli options

Currently, some cli options descriptions could not show correctly. It is 
just because KubernetesConfigOptions.CONTAINER_IMAGE.description().toString() 
is used. The Description#toString() does not represent the real description 
content.
---
 docs/_includes/generated/kubernetes_config_configuration.html  | 2 +-
 .../java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java | 7 +++
 .../flink/kubernetes/configuration/KubernetesConfigOptions.java| 2 +-
 3 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/kubernetes_config_configuration.html 
b/docs/_includes/generated/kubernetes_config_configuration.html
index 932a21e..192b0d6 100644
--- a/docs/_includes/generated/kubernetes_config_configuration.html
+++ b/docs/_includes/generated/kubernetes_config_configuration.html
@@ -12,7 +12,7 @@
 kubernetes.cluster-id
 (none)
 String
-The cluster id that will be used for flink cluster. If it's 
not set, the client will generate a random UUID name.
+The cluster id used for identifying the unique flink cluster. 
If it's not set, the client will generate a random UUID name.
 
 
 kubernetes.config.file
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
index b71eb00..c95e04c 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.kubernetes.cli;
 
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-
 import org.apache.commons.cli.Option;
 
 /**
@@ -46,7 +44,8 @@ public class KubernetesCliOptions {
.longOpt("clusterId")
.required(false)
.hasArg(true)
-   
.desc(KubernetesConfigOptions.CLUSTER_ID.description().toString())
+   .desc("The cluster id used for identifying the unique flink 
cluster. If it's not set, the client will generate " +
+   "a random UUID name.")
.build();
 
public static final Option IMAGE_OPTION = Option.builder("i")
@@ -54,7 +53,7 @@ public class KubernetesCliOptions {
.required(false)
.hasArg(true)
.argName("image-name")
-   
.desc(KubernetesConfigOptions.CONTAINER_IMAGE.description().toString())
+   .desc("Image to use for Flink containers")
.build();
 
public static final Option JOB_MANAGER_MEMORY_OPTION = 
Option.builder("jm")
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index d65d4fb..6136d5e 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -97,7 +97,7 @@ public class KubernetesConfigOptions {
key("kubernetes.cluster-id")
.stringType()
.noDefaultValue()
-   .withDescription("The cluster id that will be used for flink 
cluster. If it's not set, " +
+   .withDescription("The cluster id used for identifying the 
unique flink cluster. If it's not set, " +
"the client will generate a random UUID name.");
 
public static final ConfigOption CONTAINER_IMAGE =



[flink] branch release-1.10 updated (97899cf -> 8f7ad84)

2019-12-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 97899cf  [FLINK-15008][build] Bundle javax.activation-api for Java 11
 new ef4ad44  [hotfix][kubernetes] Remove per-job related config options
 new c165191  [hotfix][kubernetes] Correct the description of cli options
 new d61ef1c  [hotfix][kubernetes] Separate creating and retrieving flink 
cluster logs
 new 8f7ad84  [FLINK-15153][kubernetes] Service selector needs to contain 
jobmanager component label

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/kubernetes_config_configuration.html |  2 +-
 .../kubernetes/KubernetesClusterDescriptor.java| 40 ++
 .../kubernetes/cli/FlinkKubernetesCustomCli.java   | 28 ---
 .../flink/kubernetes/cli/KubernetesCliOptions.java | 23 ++---
 .../configuration/KubernetesConfigOptions.java |  2 +-
 .../kubeclient/decorators/ServiceDecorator.java|  9 -
 .../kubernetes/kubeclient/Fabric8ClientTest.java   |  3 ++
 7 files changed, 35 insertions(+), 72 deletions(-)



[flink] 01/11: [FLINK-14926][state-backend-rocksdb] Ensure that RocksObjects are always closed on backend disposal

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e05618b427f22cb73694658f5a21fb1767709efa
Author: Yu Li 
AuthorDate: Sun Nov 24 16:20:09 2019 +0800

[FLINK-14926][state-backend-rocksdb] Ensure that RocksObjects are always 
closed on backend disposal

This also ensures that the newly introduces shared resources (shared cache 
and write buffers)
are properly closed when the RocksDB state backend is disposed.

This closes #10300
---
 .../state/DefaultConfigurableOptionsFactory.java   |   5 +-
 .../contrib/streaming/state/OptionsFactory.java|  30 -
 .../contrib/streaming/state/PredefinedOptions.java |  99 
 .../streaming/state/RocksDBKeyedStateBackend.java  |  11 +-
 .../state/RocksDBKeyedStateBackendBuilder.java |  23 ++--
 .../streaming/state/RocksDBResourceContainer.java  | 122 
 .../streaming/state/RocksDBStateBackend.java   |  62 +-
 .../contrib/streaming/state/RocksDBResource.java   |  20 ++--
 .../state/RocksDBResourceContainerTest.java| 125 +
 .../state/RocksDBStateBackendConfigTest.java   |  19 ++--
 .../streaming/state/RocksDBStateBackendTest.java   |  15 +--
 .../contrib/streaming/state/RocksDBTestUtils.java  |  13 +--
 12 files changed, 442 insertions(+), 102 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 962a12e..12c0eb9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -31,6 +31,7 @@ import org.rocksdb.PlainTableConfig;
 import org.rocksdb.TableFormatConfig;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -59,7 +60,7 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
private final Map configuredOptions = new HashMap<>();
 
@Override
-   public DBOptions createDBOptions(DBOptions currentOptions) {
+   public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
if (isOptionConfigured(MAX_BACKGROUND_THREADS)) {

currentOptions.setIncreaseParallelism(getMaxBackgroundThreads());
}
@@ -72,7 +73,7 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
}
 
@Override
-   public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
+   public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
if (isOptionConfigured(COMPACTION_STYLE)) {
currentOptions.setCompactionStyle(getCompactionStyle());
}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 3204580..79815b9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -21,6 +21,8 @@ package org.apache.flink.contrib.streaming.state;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
+import java.util.Collection;
+
 /**
  * A factory for {@link DBOptions} and {@link ColumnFamilyOptions} to be 
passed to the {@link RocksDBStateBackend}.
  * Options have to be created lazily by this factory, because the {@code 
Options}
@@ -52,9 +54,21 @@ public interface OptionsFactory extends java.io.Serializable 
{
 * the setter methods, otherwise the pre-defined options may get lost.
 *
 * @param currentOptions The options object with the pre-defined 
options.
+* @param handlesToClose The collection to register newly created 
{@link org.rocksdb.RocksObject}s.
+* @return The options object on which the additional options are set.
+*/
+   DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose);
+
+   /**
+* Set the additional options on top of the current options object.
+*
+* @param currentOptions The options object with the pre-defined 
option

[flink] 04/11: [hotfix][tests] Ensure RocksDB native library is loaded into temp directory

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3820bdd364a86f2acebe54577a36c01499dfe326
Author: Stephan Ewen 
AuthorDate: Mon Dec 9 19:09:26 2019 +0100

[hotfix][tests] Ensure RocksDB native library is loaded into temp directory

This moves the loading code from one specific test method into a the 
initialization phase.
That way the extraction happens into the target location also in cases 
where other test methods execute first.
---
 .../streaming/state/RocksDBResourceContainerTest.java| 16 
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index f72dd1a..5ec3c04 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -21,7 +21,8 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.util.function.ThrowingRunnable;
 
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyOptions;
@@ -30,6 +31,7 @@ import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.WriteBufferManager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -40,8 +42,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 public class RocksDBResourceContainerTest {
 
-   @Rule
-   public final TemporaryFolder tmp = new TemporaryFolder();
+   @ClassRule
+   public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+   @BeforeClass
+   public static void ensureRocksDbNativeLibraryLoaded() throws 
IOException {
+   
NativeLibraryLoader.getInstance().loadLibrary(TMP_FOLDER.newFolder().getAbsolutePath());
+   }
+
+   // 

 
@Test
public void testFreeDBOptionsAfterClose() throws Exception {
@@ -108,7 +117,6 @@ public class RocksDBResourceContainerTest {
 
@Test
public void testFreeSharedResourcesAfterClose() throws Exception {
-   
NativeLibraryLoader.getInstance().loadLibrary(tmp.newFolder().getAbsolutePath());
RocksDBResourceContainer container = new 
RocksDBResourceContainer();
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);



[flink] 08/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on RocksDBResourceContainer for cleanup of native handles

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93deb1a709840c2ae8957ea88cd8ffc0929951ad
Author: Stephan Ewen 
AuthorDate: Mon Dec 9 20:22:08 2019 +0100

[FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on 
RocksDBResourceContainer for cleanup of native handles
---
 .../contrib/streaming/state/RocksDBResourceContainer.java|  6 ++
 .../flink/contrib/streaming/state/RocksDBStateBackend.java   | 12 +---
 .../contrib/streaming/state/RocksDBStateBackendTest.java | 12 
 3 files changed, 11 insertions(+), 19 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 32c198f..20c3c5c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -106,6 +106,12 @@ final class RocksDBResourceContainer implements 
AutoCloseable {
return opt;
}
 
+   RocksDBNativeMetricOptions 
getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
+   return optionsFactory == null
+   ? defaultMetricOptions
+   : 
optionsFactory.createNativeMetricsOptions(defaultMetricOptions);
+   }
+
@Nullable
OptionsFactory getOptionsFactory() {
return optionsFactory;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 829566d..3ffb021 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -560,7 +560,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu

.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())

.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())

.setNumberOfTransferingThreads(getNumberOfTransferThreads())
-   
.setNativeMetricOptions(getMemoryWatcherOptions(resourceContainer));
+   
.setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(defaultMetricOptions));
return builder.build();
}
 
@@ -853,16 +853,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
return resourceContainer.getColumnOptions();
}
 
-   public RocksDBNativeMetricOptions 
getMemoryWatcherOptions(RocksDBResourceContainer resourceContainer) {
-   RocksDBNativeMetricOptions options = this.defaultMetricOptions;
-   OptionsFactory optionsFactory = 
resourceContainer.getOptionsFactory();
-   if (optionsFactory != null) {
-   options = 
optionsFactory.createNativeMetricsOptions(options);
-   }
-
-   return options;
-   }
-
/**
 * Gets the number of threads used to transfer files while 
snapshotting/restoring.
 */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index e59f6e1..b31e190 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -115,14 +115,12 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase handlesToClose = new ArrayList<>();
+   private final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
 
public void prepareRocksDB() throws Exception {
String dbPath = new File(tempFolder.newFolder(), 
DB_INSTANCE_DIR_STRING).getAbsolutePath();
-   columnOptions = 
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose);
-   optionsContainer = new RocksDBResourceContainer();
+   ColumnFamil

[flink] 03/11: [hotfix][tests Use assumptions rather than manual checks in RocksDBStateBackendConfigTest to report skipped tests properly

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08554b9b1733fc148397c9179681f54a6cb24ef0
Author: Stephan Ewen 
AuthorDate: Mon Dec 9 20:32:13 2019 +0100

[hotfix][tests Use assumptions rather than manual checks in 
RocksDBStateBackendConfigTest to report skipped tests properly
---
 .../state/RocksDBStateBackendConfigTest.java| 21 +++--
 1 file changed, 7 insertions(+), 14 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index b106000..bd2a34a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.IOUtils;
 
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -311,16 +312,13 @@ public class RocksDBStateBackendConfigTest {
 
@Test
public void testFailWhenNoLocalStorageDir() throws Exception {
+   final File targetDir = tempFolder.newFolder();
+   Assume.assumeTrue("Cannot mark directory non-writable", 
targetDir.setWritable(false, false));
+
String checkpointPath = 
tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
-   File targetDir = tempFolder.newFolder();
 
try {
-   if (!targetDir.setWritable(false, false)) {
-   System.err.println("Cannot execute 
'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable");
-   return;
-   }
-

rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
 
boolean hasFailure = false;
@@ -354,19 +352,14 @@ public class RocksDBStateBackendConfigTest {
 
@Test
public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
-   File targetDir1 = tempFolder.newFolder();
-   File targetDir2 = tempFolder.newFolder();
+   final File targetDir1 = tempFolder.newFolder();
+   final File targetDir2 = tempFolder.newFolder();
+   Assume.assumeTrue("Cannot mark directory non-writable", 
targetDir1.setWritable(false, false));
 
String checkpointPath = 
tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
 
try {
-
-   if (!targetDir1.setWritable(false, false)) {
-   System.err.println("Cannot execute 
'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory 
non-writable");
-   return;
-   }
-

rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), 
targetDir2.getAbsolutePath());
 
try {



[flink] 07/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Make RocksDBResourceContainer immutable

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0602ccf1d23c41bdbf335f6fb3e4cc39d7d5d7d1
Author: Stephan Ewen 
AuthorDate: Mon Dec 9 19:42:00 2019 +0100

[FLINK-14926][state-backend-rocksdb] (follow-up) Make 
RocksDBResourceContainer immutable
---
 .../streaming/state/RocksDBResourceContainer.java  | 50 +++---
 .../streaming/state/RocksDBStateBackend.java   | 31 +++---
 .../state/RocksDBResourceContainerTest.java|  7 ++-
 3 files changed, 43 insertions(+), 45 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 4cb18d8..32c198f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -24,11 +24,12 @@ import org.apache.flink.util.IOUtils;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The container for RocksDB resources, including predefined options, option 
factory and
  * shared resource among instances.
@@ -39,22 +40,37 @@ import java.util.ArrayList;
 final class RocksDBResourceContainer implements AutoCloseable {
 
/** The pre-configured option settings. */
-   private PredefinedOptions predefinedOptions;
+   private final PredefinedOptions predefinedOptions;
 
/** The options factory to create the RocksDB options. */
@Nullable
-   private OptionsFactory optionsFactory;
+   private final OptionsFactory optionsFactory;
 
/** The shared resource among RocksDB instances. This resource is not 
part of the 'handlesToClose',
 * because the handles to close are closed quietly, whereas for this 
one, we want exceptions to be reported. */
@Nullable
-   private OpaqueMemoryResource sharedResources;
+   private final OpaqueMemoryResource 
sharedResources;
 
/** The handles to be closed when the container is closed. */
private final ArrayList handlesToClose;
 
public RocksDBResourceContainer() {
-   handlesToClose = new ArrayList<>();
+   this(PredefinedOptions.DEFAULT, null, null);
+   }
+
+   public RocksDBResourceContainer(PredefinedOptions predefinedOptions, 
@Nullable OptionsFactory optionsFactory) {
+   this(predefinedOptions, optionsFactory, null);
+   }
+
+   public RocksDBResourceContainer(
+   PredefinedOptions predefinedOptions,
+   @Nullable OptionsFactory optionsFactory,
+   @Nullable OpaqueMemoryResource 
sharedResources) {
+
+   this.predefinedOptions = checkNotNull(predefinedOptions);
+   this.optionsFactory = optionsFactory;
+   this.sharedResources = sharedResources;
+   this.handlesToClose = new ArrayList<>();
}
 
/**
@@ -62,7 +78,7 @@ final class RocksDBResourceContainer implements AutoCloseable 
{
 */
DBOptions getDbOptions() {
// initial options from pre-defined profile
-   DBOptions opt = 
checkAndGetPredefinedOptions().createDBOptions(handlesToClose);
+   DBOptions opt = 
predefinedOptions.createDBOptions(handlesToClose);
 
// add user-defined options factory, if specified
if (optionsFactory != null) {
@@ -80,7 +96,7 @@ final class RocksDBResourceContainer implements AutoCloseable 
{
 */
ColumnFamilyOptions getColumnOptions() {
// initial options from pre-defined profile
-   ColumnFamilyOptions opt = 
checkAndGetPredefinedOptions().createColumnOptions(handlesToClose);
+   ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions(handlesToClose);
 
// add user-defined options, if specified
if (optionsFactory != null) {
@@ -90,29 +106,11 @@ final class RocksDBResourceContainer implements 
AutoCloseable {
return opt;
}
 
-   PredefinedOptions checkAndGetPredefinedOptions() {
-   if (predefinedOptions == null) {
-   predefinedOptions = PredefinedOptions.DEFAULT;
-   }
-   return predefinedOptions;
-   }
-
+   @Nullable
OptionsFactory getOptionsFactory() {
return optionsFactory;
}
 
-   void setPredefinedOpt

[flink] 10/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify collecton the option objects to close

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2405111bd62595e728669ad7753b1aa0be960ae6
Author: Stephan Ewen 
AuthorDate: Tue Dec 10 11:52:59 2019 +0100

[FLINK-14926][state-backend-rocksdb] (follow-up) Simplify collecton the 
option objects to close

  - rather than letting each option factory method add the option to the 
list itself, add it in one place in the Resource Container.
  - assume the list passed to the factory methods is always non-null
---
 .../contrib/streaming/state/PredefinedOptions.java | 63 +-
 .../streaming/state/RocksDBResourceContainer.java  |  2 +
 2 files changed, 15 insertions(+), 50 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index a629756..47c7d12 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -25,6 +25,7 @@ import org.rocksdb.CompactionStyle;
 import org.rocksdb.DBOptions;
 import org.rocksdb.InfoLogLevel;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 /**
@@ -58,24 +59,15 @@ public enum PredefinedOptions {
 
@Override
public DBOptions createDBOptions(Collection 
handlesToClose) {
-   DBOptions dbOptions =
-   new DBOptions()
+   return new DBOptions()
.setUseFsync(false)

.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
-   if (handlesToClose != null) {
-   handlesToClose.add(dbOptions);
-   }
-   return dbOptions;
}
 
@Override
public ColumnFamilyOptions 
createColumnOptions(Collection handlesToClose) {
-   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
-   if (handlesToClose != null) {
-   handlesToClose.add(columnFamilyOptions);
-   }
-   return columnFamilyOptions;
+   return new ColumnFamilyOptions();
}
 
},
@@ -106,29 +98,19 @@ public enum PredefinedOptions {
 
@Override
public DBOptions createDBOptions(Collection 
handlesToClose) {
-   DBOptions dbOptions =
-   new DBOptions()
+   return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)

.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
-   if (handlesToClose != null) {
-   handlesToClose.add(dbOptions);
-   }
-   return dbOptions;
}
 
@Override
public ColumnFamilyOptions 
createColumnOptions(Collection handlesToClose) {
-   ColumnFamilyOptions columnFamilyOptions =
-   new ColumnFamilyOptions()
+   return new ColumnFamilyOptions()

.setCompactionStyle(CompactionStyle.LEVEL)

.setLevelCompactionDynamicLevelBytes(true);
-   if (handlesToClose != null) {
-   handlesToClose.add(columnFamilyOptions);
-   }
-   return columnFamilyOptions;
}
},
 
@@ -164,18 +146,12 @@ public enum PredefinedOptions {
 
@Override
public DBOptions createDBOptions(Collection 
handlesToClose) {
-
-   DBOptions dbOptions =
-   new DBOptions()
+   return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)

.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
-   if (han

[flink] 11/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for RocksDBOptionsFactory

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07f08fb19a814cbac03b92e675fe567e7a62e510
Author: Stephan Ewen 
AuthorDate: Tue Dec 10 13:29:05 2019 +0100

[FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for 
RocksDBOptionsFactory

OptionsFactory was breaking existing implementations by adding a method to 
the interface without default method.
To solve this, we keep OptionsFactory but replace it in the RocksDB State 
Backend with the RocksDBOptionsFactory which has
the evolved signature. The OptionsFactory is still accepted and wrapped 
into a RocksDBOptionsFactory, for compatibility.
---
 .../state/ConfigurableOptionsFactory.java  |  2 +-
 ...java => ConfigurableRocksDBOptionsFactory.java} |  4 +-
 .../state/DefaultConfigurableOptionsFactory.java   |  6 +-
 .../contrib/streaming/state/OptionsFactory.java| 51 ++---
 .../state/RocksDBConfigurableOptions.java  |  4 +-
 ...ionsFactory.java => RocksDBOptionsFactory.java} | 63 +---
 .../state/RocksDBOptionsFactoryAdapter.java| 75 ++
 .../streaming/state/RocksDBResourceContainer.java  |  8 +-
 .../streaming/state/RocksDBStateBackend.java   | 69 +++--
 .../RocksDBOptionsFactoryCompatibilityTest.java| 88 ++
 .../contrib/streaming/state/RocksDBResource.java   |  6 +-
 .../state/RocksDBStateBackendConfigTest.java   | 13 ++--
 12 files changed, 273 insertions(+), 116 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
index 80d1a1f..343eda8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * An interface for options factory that pick up additional parameters from a 
configuration.
+ * @deprecated Replaced by {@link ConfigurableRocksDBOptionsFactory}.
  */
 public interface ConfigurableOptionsFactory extends OptionsFactory {
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java
similarity index 91%
copy from 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
copy to 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java
index 80d1a1f..ac7b882 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 /**
  * An interface for options factory that pick up additional parameters from a 
configuration.
  */
-public interface ConfigurableOptionsFactory extends OptionsFactory {
+public interface ConfigurableRocksDBOptionsFactory extends 
RocksDBOptionsFactory {
 
/**
 * Creates a variant of the options factory that applies additional 
configuration parameters.
@@ -35,5 +35,5 @@ public interface ConfigurableOptionsFactory extends 
OptionsFactory {
 * @param configuration The configuration to pick the values from.
 * @return A reconfigured options factory.
 */
-   OptionsFactory configure(Configuration configuration);
+   RocksDBOptionsFactory configure(Configuration configuration);
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 93753b2..aaa9e83 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -51,10 +51,10 @@ import sta

[flink] 09/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Avoid exposing handles that will not be closed from RocksDBStateBackend

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31dd8c36246c86db3fb07790158a8c9fe373b726
Author: Stephan Ewen 
AuthorDate: Tue Dec 10 11:17:47 2019 +0100

[FLINK-14926][state-backend-rocksdb] (follow-up) Avoid exposing handles 
that will not be closed from RocksDBStateBackend

That means that RocksDBStateBackend should not have accessors to gather 
created native handles.
The native handles should only be created once the Resource Container is 
created.
This implies removing tome test methods from RocksDBStateBackend and 
changing the tests to test against the
RocksDBResourceContainer instead.
---
 .../streaming/state/RocksDBResourceContainer.java  |  4 ++
 .../streaming/state/RocksDBStateBackend.java   | 48 +++---
 .../state/RocksDBStateBackendConfigTest.java   | 59 +-
 3 files changed, 46 insertions(+), 65 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 20c3c5c..199cd84 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -112,6 +112,10 @@ final class RocksDBResourceContainer implements 
AutoCloseable {
: 
optionsFactory.createNativeMetricsOptions(defaultMetricOptions);
}
 
+   PredefinedOptions getPredefinedOptions() {
+   return predefinedOptions;
+   }
+
@Nullable
OptionsFactory getOptionsFactory() {
return optionsFactory;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3ffb021..b572deb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -131,9 +131,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
private File[] localRocksDbDirectories;
 
/** The pre-configured option settings. */
+   @Nullable
private PredefinedOptions predefinedOptions;
 
/** The options factory to create the RocksDB options in the cluster. */
+   @Nullable
private OptionsFactory optionsFactory;
 
/** This determines if incremental checkpointing is enabled. */
@@ -505,8 +507,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
final OpaqueMemoryResource 
sharedResources = RocksDBOperationUtils

.allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), 
LOG);
 
-   final RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
-   getConfiguredPredefinedOptionsOrDefault(), 
optionsFactory, sharedResources);
+   final RocksDBResourceContainer resourceContainer = 
createOptionsAndResourceContainer(sharedResources);
 
final DBOptions dbOptions = resourceContainer.getDbOptions();
final Function createColumnOptions;
@@ -826,34 +827,12 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
 *
 * @return The options factory.
 */
-   @VisibleForTesting
+   @Nullable
public OptionsFactory getOptions() {
return optionsFactory;
}
 
/**
-* Gets the RocksDB {@link DBOptions} to be used for all RocksDB 
instances. Only for testing.
-*/
-   @VisibleForTesting
-   public DBOptions getDbOptions() {
-   RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
-   getConfiguredPredefinedOptionsOrDefault(),
-   optionsFactory);
-   return resourceContainer.getDbOptions();
-   }
-
-   /**
-* Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances. Only for testing.
-*/
-   @VisibleForTesting
-   public ColumnFamilyOptions getColumnOptions() {
-   RocksDBResourceContainer resourceContainer = new 
RocksDBResourceContainer(
-   getConfigur

[flink] 06/11: [hotfix][state-backend-rocksdb] Some minor style cleanups in RocksDBResourceContainer

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6955ba77dd11c99cde31c990a4d2c9f8ee57232a
Author: Stephan Ewen 
AuthorDate: Mon Dec 9 18:31:58 2019 +0100

[hotfix][state-backend-rocksdb] Some minor style cleanups in 
RocksDBResourceContainer

  - Make visibility consistent package private (was mix of package private 
and public)
  - remove unused method
  - Adjust line spacing between fields
  - Add some JavaDocs
  - Make final (not designed for inheritance)
  - Nullable annotations
---
 .../streaming/state/RocksDBResourceContainer.java   | 21 -
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 6e0f23c..4cb18d8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -25,25 +25,32 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 
 /**
  * The container for RocksDB resources, including predefined options, option 
factory and
  * shared resource among instances.
- * 
- * This should be the only entrance for {@link RocksDBStateBackend} to get 
RocksDB options,
+ *
+ * This should be the only entrance for {@link RocksDBStateBackend} to get 
RocksDB options,
  * and should be properly (and necessarily) closed to prevent resource leak.
  */
-public class RocksDBResourceContainer implements AutoCloseable {
+final class RocksDBResourceContainer implements AutoCloseable {
 
/** The pre-configured option settings. */
private PredefinedOptions predefinedOptions;
+
/** The options factory to create the RocksDB options. */
+   @Nullable
private OptionsFactory optionsFactory;
-   /** The shared resource among RocksDB instances. */
+
+   /** The shared resource among RocksDB instances. This resource is not 
part of the 'handlesToClose',
+* because the handles to close are closed quietly, whereas for this 
one, we want exceptions to be reported. */
+   @Nullable
private OpaqueMemoryResource sharedResources;
 
+   /** The handles to be closed when the container is closed. */
private final ArrayList handlesToClose;
 
public RocksDBResourceContainer() {
@@ -71,7 +78,7 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
/**
 * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances.
 */
-   public ColumnFamilyOptions getColumnOptions() {
+   ColumnFamilyOptions getColumnOptions() {
// initial options from pre-defined profile
ColumnFamilyOptions opt = 
checkAndGetPredefinedOptions().createColumnOptions(handlesToClose);
 
@@ -83,10 +90,6 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
return opt;
}
 
-   PredefinedOptions getPredefinedOptions() {
-   return predefinedOptions;
-   }
-
PredefinedOptions checkAndGetPredefinedOptions() {
if (predefinedOptions == null) {
predefinedOptions = PredefinedOptions.DEFAULT;



[flink] branch master updated (eddad99 -> 07f08fb)

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from eddad99  [FLINK-15008][legal] Update licensing
 new e05618b  [FLINK-14926][state-backend-rocksdb] Ensure that RocksObjects 
are always closed on backend disposal
 new 6560929  [hotfix][tests] Remove unnecessary temp directory handling
 new 08554b9  [hotfix][tests Use assumptions rather than manual checks in 
RocksDBStateBackendConfigTest to report skipped tests properly
 new 3820bdd  [hotfix][tests] Ensure RocksDB native library is loaded into 
temp directory
 new fdc5ed5  [FLINK-15177][state-backend-rocksdb] Migrate RocksDB 
Configurable Options to new type safe config options
 new 6955ba7  [hotfix][state-backend-rocksdb] Some minor style cleanups in 
RocksDBResourceContainer
 new 0602ccf  [FLINK-14926][state-backend-rocksdb] (follow-up) Make 
RocksDBResourceContainer immutable
 new 93deb1a  [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify 
test to rely on RocksDBResourceContainer for cleanup of native handles
 new 31dd8c3  [FLINK-14926][state-backend-rocksdb] (follow-up) Avoid 
exposing handles that will not be closed from RocksDBStateBackend
 new 2405111  [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify 
collecton the option objects to close
 new 07f08fb  [FLINK-14926][state-backend-rocksdb] (follow-up) Replace 
OptionsFactory for RocksDBOptionsFactory

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../state/ConfigurableOptionsFactory.java  |   2 +-
 ...java => ConfigurableRocksDBOptionsFactory.java} |   4 +-
 .../state/DefaultConfigurableOptionsFactory.java   | 105 +++-
 .../contrib/streaming/state/OptionsFactory.java|  21 +---
 .../contrib/streaming/state/PredefinedOptions.java |  48 ++--
 .../state/RocksDBConfigurableOptions.java  |  40 --
 .../streaming/state/RocksDBKeyedStateBackend.java  |  11 +-
 .../state/RocksDBKeyedStateBackendBuilder.java |  23 ++--
 ...ionsFactory.java => RocksDBOptionsFactory.java} |  49 ++--
 .../state/RocksDBOptionsFactoryAdapter.java|  75 
 .../streaming/state/RocksDBResourceContainer.java  | 135 +
 .../streaming/state/RocksDBStateBackend.java   | 130 ++--
 .../RocksDBOptionsFactoryCompatibilityTest.java|  88 ++
 .../contrib/streaming/state/RocksDBResource.java   |  26 ++--
 .../state/RocksDBResourceContainerTest.java| 132 
 .../state/RocksDBStateBackendConfigTest.java   | 134 +---
 .../streaming/state/RocksDBStateBackendTest.java   |  27 ++---
 .../contrib/streaming/state/RocksDBTestUtils.java  |  13 +-
 18 files changed, 757 insertions(+), 306 deletions(-)
 copy 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/{ConfigurableOptionsFactory.java
 => ConfigurableRocksDBOptionsFactory.java} (91%)
 copy 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/{OptionsFactory.java
 => RocksDBOptionsFactory.java} (62%)
 create mode 100644 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
 create mode 100644 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 create mode 100644 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java
 create mode 100644 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java



[flink] 02/11: [hotfix][tests] Remove unnecessary temp directory handling

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6560929dba26c1a725bb7f8e8f35e3b5dafef42f
Author: Stephan Ewen 
AuthorDate: Mon Dec 9 20:18:17 2019 +0100

[hotfix][tests] Remove unnecessary temp directory handling

Remove the use of a non-needed temp directory and relies on JUnit tool 
rather than manual cleanup.
---
 .../contrib/streaming/state/RocksDBStateBackendTest.java | 12 ++--
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index be99c51..e59f6e1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -114,16 +114,13 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase handlesToClose = new ArrayList<>();
 
public void prepareRocksDB() throws Exception {
-   instanceBasePath = tempFolder.newFolder();
-   instanceBasePath.mkdirs();
-   String dbPath = new File(instanceBasePath, 
DB_INSTANCE_DIR_STRING).getAbsolutePath();
+   String dbPath = new File(tempFolder.newFolder(), 
DB_INSTANCE_DIR_STRING).getAbsolutePath();
columnOptions = 
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose);
optionsContainer = new RocksDBResourceContainer();
ArrayList columnFamilyHandles = new 
ArrayList<>(1);
@@ -170,11 +167,6 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase

[flink] 05/11: [FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options to new type safe config options

2019-12-11 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fdc5ed546360fe438f81c564265f8d623f88f02e
Author: Stephan Ewen 
AuthorDate: Tue Dec 10 12:19:59 2019 +0100

[FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options 
to new type safe config options

This also simplifies the validation logic.
---
 .../state/DefaultConfigurableOptionsFactory.java   | 94 +-
 .../state/RocksDBConfigurableOptions.java  | 36 ++---
 .../state/RocksDBStateBackendConfigTest.java   | 24 +++---
 3 files changed, 75 insertions(+), 79 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 12c0eb9..93753b2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -32,12 +32,11 @@ import org.rocksdb.TableFormatConfig;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE;
@@ -57,6 +56,8 @@ import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption
  */
 public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFactory {
 
+   private static final long serialVersionUID = 1L;
+
private final Map configuredOptions = new HashMap<>();
 
@Override
@@ -131,7 +132,7 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
return new HashMap<>(configuredOptions);
}
 
-   private boolean isOptionConfigured(ConfigOption configOption) {
+   private boolean isOptionConfigured(ConfigOption configOption) {
return configuredOptions.containsKey(configOption.key());
}
 
@@ -300,44 +301,37 @@ public class DefaultConfigurableOptionsFactory implements 
ConfigurableOptionsFac
return this;
}
 
-   private static final String[] CANDIDATE_CONFIGS = new String[]{
+   private static final ConfigOption[] CANDIDATE_CONFIGS = new 
ConfigOption[] {
// configurable DBOptions
-   MAX_BACKGROUND_THREADS.key(),
-   MAX_OPEN_FILES.key(),
+   MAX_BACKGROUND_THREADS,
+   MAX_OPEN_FILES,
 
// configurable ColumnFamilyOptions
-   COMPACTION_STYLE.key(),
-   USE_DYNAMIC_LEVEL_SIZE.key(),
-   TARGET_FILE_SIZE_BASE.key(),
-   MAX_SIZE_LEVEL_BASE.key(),
-   WRITE_BUFFER_SIZE.key(),
-   MAX_WRITE_BUFFER_NUMBER.key(),
-   MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(),
-   BLOCK_SIZE.key(),
-   BLOCK_CACHE_SIZE.key()
+   COMPACTION_STYLE,
+   USE_DYNAMIC_LEVEL_SIZE,
+   TARGET_FILE_SIZE_BASE,
+   MAX_SIZE_LEVEL_BASE,
+   WRITE_BUFFER_SIZE,
+   MAX_WRITE_BUFFER_NUMBER,
+   MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
+   BLOCK_SIZE,
+   BLOCK_CACHE_SIZE
};
 
-   private static final Set POSITIVE_INT_CONFIG_SET = new 
HashSet<>(Arrays.asList(
-   MAX_BACKGROUND_THREADS.key(),
-   MAX_WRITE_BUFFER_NUMBER.key(),
-   MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key()
-   ));
-
-   private static final Set SIZE_CONFIG_SET = new 
HashSet<>(Arrays.asList(
-   TARGET_FILE_SIZE_BASE.key(),
-   MAX_SIZE_LEVEL_BASE.key(),
-   WRITE_BUFFER_SIZE.key(),
-   BLOCK_SIZE.key(),
-   BLOCK_CACHE_SIZE.key()
+   private static final Set> POSITIVE_INT_CONFIG_SET = new 
HashSet<>(Arrays.asList(
+   MAX_BACKGROUND_THREADS,
+   MAX_WRITE_BUFFER_NUMBER,
+   MIN_WRITE_BUFFER_NUMBER_TO_MERGE
));
 
-   private static final Set BOOLEAN_CONFIG_SET = new 
HashSet<>(Collections.singletonList(
-   USE_DYNAMIC_LEVEL_SIZE.key()
+   private static final Set> SIZE_CONFIG_SET = new 
HashSet<>(Arrays.asList(
+   TARGET_FILE_SIZE_BASE,
+   MAX_SIZE_LEVEL_BASE,
+   WRITE_BUFFER_SIZE,
+ 

[flink-web] branch asf-site updated (20e5a8b -> b69c6fe)

2019-12-11 Thread fhueske
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from 20e5a8b  Rebuild website
 new d23026a  [FLINK-14213] Replace link to "Local Setup Tutorial" by link 
to "Getting Started Overview".
 new b69c6fe  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _data/i18n.yml  | 6 --
 _includes/navbar.html   | 4 ++--
 content/2019/05/03/pulsar-flink.html| 4 ++--
 content/2019/05/14/temporal-tables.html | 4 ++--
 content/2019/05/19/state-ttl.html   | 4 ++--
 content/2019/06/05/flink-network-stack.html | 4 ++--
 content/2019/06/26/broadcast-state.html | 4 ++--
 content/2019/07/23/flink-network-stack-2.html   | 4 ++--
 content/blog/index.html | 4 ++--
 content/blog/page10/index.html  | 4 ++--
 content/blog/page2/index.html   | 4 ++--
 content/blog/page3/index.html   | 4 ++--
 content/blog/page4/index.html   | 4 ++--
 content/blog/page5/index.html   | 4 ++--
 content/blog/page6/index.html   | 4 ++--
 content/blog/page7/index.html   | 4 ++--
 content/blog/page8/index.html   | 4 ++--
 content/blog/page9/index.html   | 4 ++--
 content/blog/release_1.0.0-changelog_known_issues.html  | 4 ++--
 content/blog/release_1.1.0-changelog.html   | 4 ++--
 content/blog/release_1.2.0-changelog.html   | 4 ++--
 content/blog/release_1.3.0-changelog.html   | 4 ++--
 content/community.html  | 4 ++--
 content/contributing/code-style-and-quality-common.html | 4 ++--
 content/contributing/code-style-and-quality-components.html | 4 ++--
 content/contributing/code-style-and-quality-formatting.html | 4 ++--
 content/contributing/code-style-and-quality-java.html   | 4 ++--
 content/contributing/code-style-and-quality-preamble.html   | 4 ++--
 content/contributing/code-style-and-quality-pull-requests.html  | 4 ++--
 content/contributing/code-style-and-quality-scala.html  | 4 ++--
 content/contributing/contribute-code.html   | 4 ++--
 content/contributing/contribute-documentation.html  | 4 ++--
 content/contributing/how-to-contribute.html | 4 ++--
 content/contributing/improve-website.html   | 4 ++--
 content/contributing/reviewing-prs.html | 4 ++--
 content/documentation.html  | 4 ++--
 content/downloads.html  | 4 ++--
 content/ecosystem.html  | 4 ++--
 content/faq.html| 4 ++--
 content/feature/2019/09/13/state-processor-api.html | 4 ++--
 content/features/2017/07/04/flink-rescalable-state.html | 4 ++--
 content/features/2018/01/30/incremental-checkpointing.html  | 4 ++--
 .../features/2018/03/01/end-to-end-exactly-once-apache-flink.html   | 4 ++--
 content/features/2019/03/11/prometheus-monitoring.html  | 4 ++--
 content/flink-applications.html | 4 ++--
 content/flink-architecture.html | 4 ++--
 content/flink-operations.html   | 4 ++--
 content/gettinghelp.html| 4 ++--
 content/index.html  | 4 ++--
 content/material.html   | 4 ++--
 content/news/2014/08/26/release-0.6.html| 4 ++--
 content/news/2014/09/26/release-0.6.1.html  | 4 ++--
 content/news/2014/10/03/upcoming_events.html| 4 ++--
 content/news/2014/11/04/release-0.7.0.html  | 4 ++--
 content/news/2014/11/18/hadoop-compatibility.html   | 4 ++--
 content/news/2015/01/06/december-in-flink.html  | 4 ++--
 content/news/2015

[flink-web] 01/02: [FLINK-14213] Replace link to "Local Setup Tutorial" by link to "Getting Started Overview".

2019-12-11 Thread fhueske
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit d23026ac16470157ad8b50e1d753f1471dae593e
Author: Fabian Hueske 
AuthorDate: Wed Sep 25 15:19:40 2019 +0200

[FLINK-14213] Replace link to "Local Setup Tutorial" by link to "Getting 
Started Overview".

This closes #272.
---
 _data/i18n.yml| 6 --
 _includes/navbar.html | 4 ++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/_data/i18n.yml b/_data/i18n.yml
index 9db8599..6c43ccf 100644
--- a/_data/i18n.yml
+++ b/_data/i18n.yml
@@ -7,7 +7,7 @@ en:
 powered_by: Powered By
 faq: FAQ
 downloads: Downloads
-tutorials: Tutorials
+getting_started: Getting Started
 documentation: Documentation
 getting_help: Getting Help
 ecosystem: Ecosystem
@@ -20,6 +20,7 @@ en:
 contribute_docs: Contribute Documentation
 contribute_website: Contribute to the Website
 roadmap: Roadmap
+tutorials: Tutorials
 
 zh:
 what_is_flink: Apache Flink 是什么?
@@ -30,7 +31,7 @@ zh:
 powered_by: Flink 用户
 faq: 常见问题
 downloads: 下载
-tutorials: 教程
+getting_started: 教程
 documentation: 文档
 getting_help: 获取帮助
 ecosystem: Ecosystem
@@ -43,3 +44,4 @@ zh:
 contribute_docs: 贡献文档
 contribute_website: 贡献网站
 roadmap: 开发计划
+tutorials: 教程
\ No newline at end of file
diff --git a/_includes/navbar.html b/_includes/navbar.html
index 71539bd..3acf26e 100755
--- a/_includes/navbar.html
+++ b/_includes/navbar.html
@@ -63,9 +63,9 @@
 
 {{ 
site.data.i18n[page.language].downloads }}
 
-
+
 
-  {{ 
site.data.i18n[page.language].tutorials }} 
+  {{ 
site.data.i18n[page.language].getting_started }} 
 
 
 



[flink] branch release-1.10 updated: [FLINK-15008][build] Bundle javax.activation-api for Java 11

2019-12-11 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 97899cf  [FLINK-15008][build] Bundle javax.activation-api for Java 11
97899cf is described below

commit 97899cf0a76e163820e1d32f9ba715e742315012
Author: Chesnay Schepler 
AuthorDate: Fri Dec 6 17:53:50 2019 +0100

[FLINK-15008][build] Bundle javax.activation-api for Java 11
---
 flink-dist/pom.xml |  30 -
 flink-dist/src/main/resources/META-INF/NOTICE  |   3 +-
 .../META-INF/licenses/LICENSE.javax.activation | 135 +
 .../src/main/resources/META-INF/NOTICE |   4 +-
 pom.xml|  12 +-
 5 files changed, 175 insertions(+), 9 deletions(-)

diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 11255cb..2442f6f 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -398,6 +398,16 @@ under the License.
provided

 
+   
+   
+   javax.activation
+   javax.activation-api
+   1.2.0
+   
+   
+   provided
+   
+

 

@@ -580,7 +590,7 @@ under the License.
maven-dependency-plugin


-   copy-jaxb-jar
+   copy-javax-jars
process-resources

copy
@@ -596,6 +606,13 @@ under the License.
jar

true

+   
+   
javax.activation
+   
javax.activation-api
+   
${javax.activation.api.version}
+   jar
+   
true
+   


${project.build.directory}/temporary

@@ -606,16 +623,19 @@ under the License.
maven-antrun-plugin


-   unpack-jaxb-library
+   unpack-javax-libraries
process-resources

run



-   
-   
+   
+   
+   

+   

+   

+   



diff --git a/flink-dist/src/main/resources/META-INF/NOTICE 
b/flink-dist/src/main/resources/META-INF/NOTICE
index 2714eda..be9dd91 100644
--- a/flink-dist/src/main/resources/META-INF/NOTICE
+++ b/flink-dist/src/main/resources/META-INF/NOTICE
@@ -53,7 +53,8 @@ See bundled license files for details.
 This project bundles the following dependencies under the CDDL 1.1 license.
 See bundled license files for details.
 
-- javax.xml.bind:jaxb-api:2.3.0
+- javax.activation:javax.activation-api:1.2.0
+- javax.xml.bind:jaxb-api:2.3.1
 
 This project bundles "org.tukaani:xz:1.5".
 This Java implementation of XZ has been put into the public domain, thus you 
can do
diff --git 
a/flink-dist/src/main/resources/META-INF/licenses/LICENSE.javax.activation 
b/flink-dist/src/main/resources/META-INF/licenses/LICENSE.javax.activation
new file mode 100644
index 000..fd16ea9
--- /dev/null
+++ b/flink-dist/src/main/resource

[flink] branch master updated (2c11291 -> 886bf6f)

2019-12-11 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2c11291  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests
 add 886bf6f  [FLINK-15008][build] Bundle javax.activation-api for Java 11

No new revisions were added by this update.

Summary of changes:
 flink-dist/pom.xml | 30 ++
 .../{LICENSE.jaxb => LICENSE.javax.activation} |  0
 pom.xml| 12 -
 3 files changed, 36 insertions(+), 6 deletions(-)
 copy flink-dist/src/main/resources/META-INF/licenses/{LICENSE.jaxb => 
LICENSE.javax.activation} (100%)



[flink] branch release-1.8 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests

2019-12-11 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new dfcbf68  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests
dfcbf68 is described below

commit dfcbf68dbd792fc27c997078be7a59a594005d8b
Author: Yangze Guo 
AuthorDate: Tue Dec 10 10:03:49 2019 +0800

[FLINK-14951][tests] Harden the thread safety of State TTL backend tests
---
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
index 24bc9dc..c2e66f1 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -19,12 +19,15 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A stub implementation of a {@link TtlTimeProvider} which guarantees that
  * processing time increases monotonically.
@@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
private static final Object lock = new Object();
 
@GuardedBy("lock")
-   static long freeze() {
+   static  T 
doWithFrozenTime(FunctionWithException action) throws E {
synchronized (lock) {
-   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
-   timeIsFrozen = true;
-   return getCurrentTimestamp();
-   } else {
-   return lastReturnedProcessingTime;
-   }
+   final long timestampBeforeUpdate = freeze();
+   T result = action.apply(timestampBeforeUpdate);
+   final long timestampAfterUpdate = unfreezeTime();
+
+   checkState(timestampAfterUpdate == 
timestampBeforeUpdate,
+   "Timestamps before and after the update do not 
match.");
+   return result;
+   }
+   }
+
+   private static long freeze() {
+   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
+   timeIsFrozen = true;
+   return getCurrentTimestamp();
+   } else {
+   return lastReturnedProcessingTime;
}
}
 
@@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
return lastReturnedProcessingTime;
}
 
-   @GuardedBy("lock")
-   static long unfreezeTime() {
-   synchronized (lock) {
-   timeIsFrozen = false;
-   return lastReturnedProcessingTime;
-   }
+   private static long unfreezeTime() {
+   timeIsFrozen = false;
+   return lastReturnedProcessingTime;
}
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 94e9dbd..ed69171 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Update state with TTL for each verifier.
@@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends 
RichFlatMapFunction verifier,
Object update) throws Exception {
 
-   final long timestampBeforeUpdate = 
MonotonicTTLTimeProvider.freeze();
-   State state = states.get(verifier.getId(

[flink] branch release-1.9 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests

2019-12-11 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 8a73d68  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests
8a73d68 is described below

commit 8a73d680869da0d7bb4d543bfc197d01f3b0e068
Author: Yangze Guo 
AuthorDate: Tue Dec 10 10:03:49 2019 +0800

[FLINK-14951][tests] Harden the thread safety of State TTL backend tests
---
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
index 24bc9dc..c2e66f1 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -19,12 +19,15 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A stub implementation of a {@link TtlTimeProvider} which guarantees that
  * processing time increases monotonically.
@@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
private static final Object lock = new Object();
 
@GuardedBy("lock")
-   static long freeze() {
+   static  T 
doWithFrozenTime(FunctionWithException action) throws E {
synchronized (lock) {
-   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
-   timeIsFrozen = true;
-   return getCurrentTimestamp();
-   } else {
-   return lastReturnedProcessingTime;
-   }
+   final long timestampBeforeUpdate = freeze();
+   T result = action.apply(timestampBeforeUpdate);
+   final long timestampAfterUpdate = unfreezeTime();
+
+   checkState(timestampAfterUpdate == 
timestampBeforeUpdate,
+   "Timestamps before and after the update do not 
match.");
+   return result;
+   }
+   }
+
+   private static long freeze() {
+   if (!timeIsFrozen || lastReturnedProcessingTime == 
Long.MIN_VALUE) {
+   timeIsFrozen = true;
+   return getCurrentTimestamp();
+   } else {
+   return lastReturnedProcessingTime;
}
}
 
@@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements 
TtlTimeProvider, Serializable {
return lastReturnedProcessingTime;
}
 
-   @GuardedBy("lock")
-   static long unfreezeTime() {
-   synchronized (lock) {
-   timeIsFrozen = false;
-   return lastReturnedProcessingTime;
-   }
+   private static long unfreezeTime() {
+   timeIsFrozen = false;
+   return lastReturnedProcessingTime;
}
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 94e9dbd..ed69171 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Update state with TTL for each verifier.
@@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends 
RichFlatMapFunction verifier,
Object update) throws Exception {
 
-   final long timestampBeforeUpdate = 
MonotonicTTLTimeProvider.freeze();
-   State state = states.get(verifier.getId(

[flink] branch master updated (5c89d12 -> 2c11291)

2019-12-11 Thread azagrebin
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5c89d12  [hotfix][doc] update Hive functions page with more info
 add 2c11291  [FLINK-14951][tests] Harden the thread safety of State TTL 
backend tests

No new revisions were added by this update.

Summary of changes:
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 36 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 21 +
 2 files changed, 30 insertions(+), 27 deletions(-)



[flink-web] branch asf-site updated (6c7c3f1 -> 20e5a8b)

2019-12-11 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from 6c7c3f1  Rebuild page
 new ff40e10  Add Vip to powered by page
 new 20e5a8b  Rebuild website

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes
 content/index.html |   6 ++
 content/poweredby.html |   4 
 content/zh/index.html  |   6 ++
 content/zh/poweredby.html  |   4 
 img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes
 index.md   |   6 ++
 index.zh.md|   6 ++
 poweredby.md   |   4 
 poweredby.zh.md|   4 
 10 files changed, 40 insertions(+)
 create mode 100644 content/img/poweredby/vip-logo.png
 create mode 100644 img/poweredby/vip-logo.png



[flink-web] 01/02: Add Vip to powered by page

2019-12-11 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit ff40e108cc801d75cec71a143fe7638482f329f6
Author: bowen.li 
AuthorDate: Mon Dec 2 12:19:07 2019 -0800

Add Vip to powered by page

This closes #286
---
 img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes
 index.md   |   6 ++
 index.zh.md|   6 ++
 poweredby.md   |   4 
 poweredby.zh.md|   4 
 5 files changed, 20 insertions(+)

diff --git a/img/poweredby/vip-logo.png b/img/poweredby/vip-logo.png
new file mode 100644
index 000..532d0bb
Binary files /dev/null and b/img/poweredby/vip-logo.png differ
diff --git a/index.md b/index.md
index 1239234..8698fb2 100644
--- a/index.md
+++ b/index.md
@@ -308,6 +308,12 @@ layout: base
 
 
   
+
+  
+
+
+
+  
 
   
 
diff --git a/index.zh.md b/index.zh.md
index 36a7131..22e809b 100644
--- a/index.zh.md
+++ b/index.zh.md
@@ -295,6 +295,12 @@ layout: base
   
 
 
+  
+
+  
+
+
+
   
 
   
diff --git a/poweredby.md b/poweredby.md
index ad606c0..b2680b2 100644
--- a/poweredby.md
+++ b/poweredby.md
@@ -134,6 +134,10 @@ If you would you like to be included on this page, please 
reach out to the [Flin
   Uber built their internal SQL-based, open-source streaming analytics 
platform AthenaX on Apache Flink. https://eng.uber.com/athenax/"; target='_blank'> Read more on the Uber 
engineering blog
   
   
+
+  Vip, one of the largest warehouse sale website for big brands in China, 
uses Flink to stream and ETL data into Apache Hive in real-time for data 
processing and analytics. https://yq.aliyun.com/articles/652548"; target='_blank'> Read more about Vip's 
story.
+  
+  
 
 Xiaomi, one of the largest electronics companies in China, built a 
platform with Flink to improve the efficiency of developing and operating 
real-time applications and use it in real-time recommendations. https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; 
target='_blank'> Learn more about how Xiaomi is using 
Flink.
   
diff --git a/poweredby.zh.md b/poweredby.zh.md
index 1ca3741..9812094 100644
--- a/poweredby.zh.md
+++ b/poweredby.zh.md
@@ -127,6 +127,10 @@ Apache Flink 为全球许多公司和企业的关键业务提供支持。在这
   Uber 在 Apache Flink 上构建了基于 SQL 的开源流媒体分析平台 AthenaX。https://eng.uber.com/athenax/"; target='_blank'> 更多信息请访问Uber工程博客
   
   
+
+  Vip,中国最大的品牌特卖网站之一,应用Flink实时的将数据流ETL到Hive中用于数据处理和分析. https://yq.aliyun.com/articles/652548"; target='_blank'> 详细了解Vip如何使用 Flink 
+  
+  
 
 小米,作为中国最大的专注于硬件与软件开发的公司之一,利用 Flink 
构建了一个内部平台,以提高开发运维实时应用程序的效率,并用于实时推荐等场景。https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; 
target='_blank'> 详细了解小米如何使用 Flink 的。
   



[flink-web] 02/02: Rebuild website

2019-12-11 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 20e5a8b7d41c51edf71c0177812e5874580cbd06
Author: Jark Wu 
AuthorDate: Wed Dec 11 16:25:15 2019 +0800

Rebuild website
---
 content/img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes
 content/index.html |   6 ++
 content/poweredby.html |   4 
 content/zh/index.html  |   6 ++
 content/zh/poweredby.html  |   4 
 5 files changed, 20 insertions(+)

diff --git a/content/img/poweredby/vip-logo.png 
b/content/img/poweredby/vip-logo.png
new file mode 100644
index 000..532d0bb
Binary files /dev/null and b/content/img/poweredby/vip-logo.png differ
diff --git a/content/index.html b/content/index.html
index 8a07d08..e846c4d 100644
--- a/content/index.html
+++ b/content/index.html
@@ -482,6 +482,12 @@
 
 
   
+
+  
+
+
+
+  
 
   
 
diff --git a/content/poweredby.html b/content/poweredby.html
index 253a21f..5e6a1ad 100644
--- a/content/poweredby.html
+++ b/content/poweredby.html
@@ -312,6 +312,10 @@
   Uber built their internal SQL-based, open-source streaming analytics 
platform AthenaX on Apache Flink. https://eng.uber.com/athenax/"; target="_blank"> Read more on the Uber 
engineering blog
   
   
+
+  Vip, one of the largest warehouse sale website for big brands in China, 
uses Flink to stream and ETL data into Apache Hive in real-time for data 
processing and analytics. https://yq.aliyun.com/articles/652548"; target="_blank"> Read more about Vip's 
story.
+  
+  
 
 Xiaomi, one of the largest electronics companies in China, built a 
platform with Flink to improve the efficiency of developing and operating 
real-time applications and use it in real-time recommendations. https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; 
target="_blank"> Learn more about how Xiaomi is using 
Flink.
   
diff --git a/content/zh/index.html b/content/zh/index.html
index d1c0b2c..758724a 100644
--- a/content/zh/index.html
+++ b/content/zh/index.html
@@ -467,6 +467,12 @@
   
 
 
+  
+
+  
+
+
+
   
 
   
diff --git a/content/zh/poweredby.html b/content/zh/poweredby.html
index c38dbf4..f98df9e 100644
--- a/content/zh/poweredby.html
+++ b/content/zh/poweredby.html
@@ -303,6 +303,10 @@
   Uber 在 Apache Flink 上构建了基于 SQL 的开源流媒体分析平台 AthenaX。https://eng.uber.com/athenax/"; target="_blank"> 更多信息请访问Uber工程博客
   
   
+
+  Vip,中国最大的品牌特卖网站之一,应用Flink实时的将数据流ETL到Hive中用于数据处理和分析. https://yq.aliyun.com/articles/652548"; target="_blank"> 详细了解Vip如何使用 Flink 
+  
+  
 
 小米,作为中国最大的专注于硬件与软件开发的公司之一,利用 Flink 
构建了一个内部平台,以提高开发运维实时应用程序的效率,并用于实时推荐等场景。https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; 
target="_blank"> 详细了解小米如何使用 Flink 的。