[flink] branch release-1.10 updated: [FLINK-15073][sql client] Sql client falis to run same query multiple times
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 4a5a720 [FLINK-15073][sql client] Sql client falis to run same query multiple times 4a5a720 is described below commit 4a5a720024992c12bbfd4fb316d04f24d23a109e Author: yuzhao.cyz AuthorDate: Mon Dec 9 14:46:44 2019 +0800 [FLINK-15073][sql client] Sql client falis to run same query multiple times After we change the SQL-CLI to stateful in FLINK-14672, each query's temporal table was left out so we can not re-registered the same object(from the same query). This closes #10523 (cherry picked from commit 7bf96cf5fbd76377f5054c3b3f6552615a94c11d) --- .../table/client/gateway/local/LocalExecutor.java | 12 +- .../client/gateway/local/LocalExecutorITCase.java | 121 + 2 files changed, 130 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index c9f50d0..1174b09 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -619,16 +619,16 @@ public class LocalExecutor implements Executor { removeTimeAttributes(table.getSchema()), context.getExecutionConfig(), context.getClassLoader()); - final String jobName = sessionId + ": " + query; + final String tableName = String.format("_tmp_table_%s", Math.abs(query.hashCode())); final Pipeline pipeline; try { // writing to a sink requires an optimization step that might reference UDFs during code compilation context.wrapClassLoader(() -> { - context.getTableEnvironment().registerTableSink(jobName, result.getTableSink()); + context.getTableEnvironment().registerTableSink(tableName, result.getTableSink()); table.insertInto( context.getQueryConfig(), - jobName); + tableName); return null; }); pipeline = context.createPipeline(jobName, context.getFlinkConfig()); @@ -638,6 +638,12 @@ public class LocalExecutor implements Executor { result.close(); // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL query.", t); + } finally { + // Remove the temporal table object. + context.wrapClassLoader(() -> { + context.getTableEnvironment().dropTemporaryTable(tableName); + return null; + }); } // store the result with a unique id diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index c7a19bf..10c797f 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -450,6 +450,50 @@ public class LocalExecutorITCase extends TestLogger { } } + @Test(timeout = 90_000L) + public void testStreamQueryExecutionChangelogMultipleTimes() throws Exception { + final URL url = getClass().getClassLoader().getResource("test-data.csv"); + Objects.requireNonNull(url); + final Map replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", planner); + replaceVars.put("$VAR_SOURCE_PATH1", url.getPath()); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_RESULT_MODE", "changelog"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + + final Executor executor = createModifiedExecutor(clusterClient, replaceVars); + final SessionContext session = new
[flink] branch master updated (98dd5dc -> 7bf96cf)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 98dd5dc [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API add 7bf96cf [FLINK-15073][sql client] Sql client falis to run same query multiple times No new revisions were added by this update. Summary of changes: .../table/client/gateway/local/LocalExecutor.java | 12 +- .../client/gateway/local/LocalExecutorITCase.java | 121 + 2 files changed, 130 insertions(+), 3 deletions(-)
[flink-web] branch asf-site updated (b69c6fe -> f78b71e)
This is an automated email from the ASF dual-hosted git repository. hequn pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from b69c6fe Rebuild website new 493689f Add Apache Flink release 1.8.3 new f78b71e Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: _config.yml| 56 _posts/2019-12-11-release-1.8.3.md | 144 + content/blog/feed.xml | 140 content/blog/index.html| 38 -- content/blog/page10/index.html | 25 content/blog/page2/index.html | 38 +++--- content/blog/page3/index.html | 40 +++--- content/blog/page4/index.html | 38 -- content/blog/page5/index.html | 37 -- content/blog/page6/index.html | 39 +++--- content/blog/page7/index.html | 38 -- content/blog/page8/index.html | 37 -- content/blog/page9/index.html | 39 +++--- content/downloads.html | 29 +++-- content/index.html | 10 +- .../11/release-1.8.3.html} | 131 +-- content/zh/downloads.html | 33 +++-- content/zh/index.html | 10 +- 18 files changed, 670 insertions(+), 252 deletions(-) create mode 100644 _posts/2019-12-11-release-1.8.3.md copy content/news/2019/{07/02/release-1.8.1.html => 12/11/release-1.8.3.html} (60%)
[flink-web] 01/02: Add Apache Flink release 1.8.3
This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 493689f49300f3b68685ccaae6224f1614eaa041 Author: hequn8128 AuthorDate: Wed Nov 27 12:59:26 2019 +0800 Add Apache Flink release 1.8.3 --- _config.yml| 56 --- _posts/2019-12-11-release-1.8.3.md | 144 + 2 files changed, 174 insertions(+), 26 deletions(-) diff --git a/_config.yml b/_config.yml index 1d880b8..bbf68d3 100644 --- a/_config.yml +++ b/_config.yml @@ -129,23 +129,23 @@ flink_releases: - version_short: 1.8 binary_release: - name: "Apache Flink 1.8.2" + name: "Apache Flink 1.8.3" scala_211: - id: "182-download_211" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.11.tgz"; - asc_url: "https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.11.tgz.asc"; - sha512_url: "https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.11.tgz.sha512"; + id: "183-download_211" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.11.tgz"; + asc_url: "https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.11.tgz.asc"; + sha512_url: "https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.11.tgz.sha512"; scala_212: - id: "182-download_212" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.12.tgz"; - asc_url: "https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.12.tgz.asc"; - sha512_url: "https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-bin-scala_2.12.tgz.sha512"; + id: "183-download_212" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.12.tgz"; + asc_url: "https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.12.tgz.asc"; + sha512_url: "https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-bin-scala_2.12.tgz.sha512"; source_release: - name: "Apache Flink 1.8.2" - id: "182-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.2/flink-1.8.2-src.tgz"; - asc_url: "https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz.asc"; - sha512_url: "https://www.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz.sha512"; + name: "Apache Flink 1.8.3" + id: "183-download-source" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.3/flink-1.8.3-src.tgz"; + asc_url: "https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-src.tgz.asc"; + sha512_url: "https://www.apache.org/dist/flink/flink-1.8.3/flink-1.8.3-src.tgz.sha512"; optional_components: - name: "Pre-bundled Hadoop 2.4.1" @@ -183,26 +183,26 @@ flink_releases: name: "Avro SQL Format" category: "SQL Formats" scala_dependent: false - id: 182-sql-format-avro - url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.2/flink-avro-1.8.2.jar - asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.2/flink-avro-1.8.2.jar.asc - sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.2/flink-avro-1.8.2.jar.sha1 + id: 183-sql-format-avro + url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.3/flink-avro-1.8.3.jar + asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.3/flink-avro-1.8.3.jar.asc + sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.3/flink-avro-1.8.3.jar.sha1 - name: "CSV SQL Format" category: "SQL Formats" scala_dependent: false - id: 182-sql-format-csv - url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.2/flink-csv-1.8.2.jar - asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.2/flink-csv-1.8.2.jar.asc - sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.2/flink-csv-1.8.2.jar.sha1 + id: 183-sql-format-csv + url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.3/flink-csv-1.8.3.jar + asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.3/flink-csv-1.8.3.jar.asc + sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.3/flink-csv-1.8.3.jar.sha1 - name: "JSON SQL Format" category: "SQL Formats
[flink-web] 02/02: Rebuild website
This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit f78b71ee1e70d5350e8527602e08b5ae87bec053 Author: hequn8128 AuthorDate: Thu Dec 12 14:03:22 2019 +0800 Rebuild website --- content/blog/feed.xml | 140 +++ content/blog/index.html| 38 ++- content/blog/page10/index.html | 25 ++ content/blog/page2/index.html | 38 +-- content/blog/page3/index.html | 40 ++-- content/blog/page4/index.html | 38 ++- content/blog/page5/index.html | 37 +-- content/blog/page6/index.html | 39 +-- content/blog/page7/index.html | 38 ++- content/blog/page8/index.html | 37 +-- content/blog/page9/index.html | 39 +-- content/downloads.html | 29 ++- content/index.html | 10 +- content/news/2019/12/11/release-1.8.3.html | 371 + content/zh/downloads.html | 33 ++- content/zh/index.html | 10 +- 16 files changed, 805 insertions(+), 157 deletions(-) diff --git a/content/blog/feed.xml b/content/blog/feed.xml index 0a838e1..c044975 100644 --- a/content/blog/feed.xml +++ b/content/blog/feed.xml @@ -7,6 +7,146 @@ https://flink.apache.org/blog/feed.xml"; rel="self" type="application/rss+xml" /> +Apache Flink 1.8.3 Released +The Apache Flink community released the third bugfix version of the Apache Flink 1.8 series.
+ +This release includes 45 fixes and minor improvements for Flink 1.8.2. The list below includes a detailed list of all fixes and improvements.
+ +We highly recommend all users to upgrade to Flink 1.8.3.
+ +Updated Maven dependencies:
+ ++ +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>1.8.3</version> +</dependency> +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>1.8.3</version> +</dependency> +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>1.8.3</version> +</dependency>
You can find the binaries on the updated Downloads page.
+ +List of resolved issues:
+ +Sub-task +
++
+ +- [FLINK-13723;] - Use liquid-c for faster doc generation +
+- [FLINK-13724;] - Remove unnecessary whitespace from the docs' sidenav +
+- [FLINK-13725;] - Use sassc for faster doc generation +
+- [FLINK-13726;] - Build docs with jekyll 4.0.0.pre.beta1 +
+- [FLINK-13791;] - Speed up sidenav by using group_by +
+Bug +
++
- [FLINK-12342;] - Yarn Resource Manager Acquires Too Many Containers +
+- [FLINK-13184;] - Starting a TaskExecutor blocks the YarnResourceManager's main thread +
+- [FLINK-13728;] - Fix wrong closing tag order in sidenav +
+- [FLINK-13746;] - Elasticsearch (v2.3.5) sink end-to-end test fails on Travis +
+- [FLINK-13749;] - Make Flink client respect classloading policy +
+- [FLINK-13892;] - HistoryServerTest failed on Travis +
[flink] branch release-1.10 updated: [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API
This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 21e3c2f [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API 21e3c2f is described below commit 21e3c2f3380bc0fbdd54c074841f69acd4d5d40f Author: hequn8128 AuthorDate: Tue Dec 10 21:29:43 2019 +0800 [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API This closes #10515. --- docs/dev/table/sourceSinks.md| 58 +++- docs/dev/table/sourceSinks.zh.md | 56 +- 2 files changed, 76 insertions(+), 38 deletions(-) diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index d0e4781..12fdb82 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -714,10 +714,11 @@ connector.debug=true For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference. -A connector for `MySystem` in our example can extend `ConnectorDescriptor` as shown below: - + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight java %} import org.apache.flink.table.descriptors.ConnectorDescriptor; import java.util.HashMap; @@ -743,9 +744,25 @@ public class MySystemConnector extends ConnectorDescriptor { } } {% endhighlight %} + +The descriptor can then be used to create a table with the table environment. + +{% highlight java %} +StreamTableEnvironment tableEnv = // ... + +tableEnv + .connect(new MySystemConnector(true)) + .withSchema(...) + .inAppendMode() + .createTemporaryTable("MySystemTable"); +{% endhighlight %} + + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight scala %} import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap @@ -763,25 +780,9 @@ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system } } {% endhighlight %} - - -The descriptor can then be used in the API as follows: +The descriptor can then be used to create a table with the table environment. - - -{% highlight java %} -StreamTableEnvironment tableEnv = // ... - -tableEnv - .connect(new MySystemConnector(true)) - .withSchema(...) - .inAppendMode() - .createTemporaryTable("MySystemTable"); -{% endhighlight %} - - - {% highlight scala %} val tableEnv: StreamTableEnvironment = // ... @@ -791,7 +792,26 @@ tableEnv .inAppendMode() .createTemporaryTable("MySystemTable") {% endhighlight %} + + + + + +You can use a Java `TableFactory` from Python using the `CustomConnectorDescriptor`. + +{% highlight python %} +s_env = StreamExecutionEnvironment.get_execution_environment() +st_env = StreamTableEnvironment.create(s_env) + +custom_connector = CustomConnectorDescriptor('my-system', 1, False) +st_env\ +.connect(custom_connector.property("connector.debug", "true")) \ +.with_schema(...) \ +.in_append_mode()\ +.create_temporary_table("MySystemTable") +{% endhighlight %} + {% top %} diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md index d0e4781..e6531b0 100644 --- a/docs/dev/table/sourceSinks.zh.md +++ b/docs/dev/table/sourceSinks.zh.md @@ -714,10 +714,11 @@ connector.debug=true For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference. -A connector for `MySystem` in our example can extend `ConnectorDescriptor` as shown below: - + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight java %} import org.apache.flink.table.descriptors.ConnectorDescriptor; import java.util.HashMap; @@ -743,9 +744,24 @@ public class MySystemConnector extends ConnectorDescriptor { } } {% endhighlight %} + +The descriptor can then be used to create a table with the table environment. + +{% highlight java %} +StreamTableEnvironment tableEnv = // ... + +tableEnv + .connect(new MySystemConnector(true)) + .withSchema(...) + .inAppendMode() + .createTemporaryTable("MySystemTable"); +{% endhighlight %} + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight scala %} import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap @@ -763,25 +779,9 @@ class MySystemConne
[flink] branch master updated: [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API
This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 98dd5dc [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API 98dd5dc is described below commit 98dd5dc4cf3b31ff7b93bf880d982306dab27845 Author: hequn8128 AuthorDate: Tue Dec 10 21:29:43 2019 +0800 [FLINK-14007][python][docs] Add documentation for how to use Java user-defined source/sink in Python API This closes #10515. --- docs/dev/table/sourceSinks.md| 58 +++- docs/dev/table/sourceSinks.zh.md | 56 +- 2 files changed, 76 insertions(+), 38 deletions(-) diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index d0e4781..12fdb82 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -714,10 +714,11 @@ connector.debug=true For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference. -A connector for `MySystem` in our example can extend `ConnectorDescriptor` as shown below: - + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight java %} import org.apache.flink.table.descriptors.ConnectorDescriptor; import java.util.HashMap; @@ -743,9 +744,25 @@ public class MySystemConnector extends ConnectorDescriptor { } } {% endhighlight %} + +The descriptor can then be used to create a table with the table environment. + +{% highlight java %} +StreamTableEnvironment tableEnv = // ... + +tableEnv + .connect(new MySystemConnector(true)) + .withSchema(...) + .inAppendMode() + .createTemporaryTable("MySystemTable"); +{% endhighlight %} + + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight scala %} import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap @@ -763,25 +780,9 @@ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system } } {% endhighlight %} - - -The descriptor can then be used in the API as follows: +The descriptor can then be used to create a table with the table environment. - - -{% highlight java %} -StreamTableEnvironment tableEnv = // ... - -tableEnv - .connect(new MySystemConnector(true)) - .withSchema(...) - .inAppendMode() - .createTemporaryTable("MySystemTable"); -{% endhighlight %} - - - {% highlight scala %} val tableEnv: StreamTableEnvironment = // ... @@ -791,7 +792,26 @@ tableEnv .inAppendMode() .createTemporaryTable("MySystemTable") {% endhighlight %} + + + + + +You can use a Java `TableFactory` from Python using the `CustomConnectorDescriptor`. + +{% highlight python %} +s_env = StreamExecutionEnvironment.get_execution_environment() +st_env = StreamTableEnvironment.create(s_env) + +custom_connector = CustomConnectorDescriptor('my-system', 1, False) +st_env\ +.connect(custom_connector.property("connector.debug", "true")) \ +.with_schema(...) \ +.in_append_mode()\ +.create_temporary_table("MySystemTable") +{% endhighlight %} + {% top %} diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md index d0e4781..e6531b0 100644 --- a/docs/dev/table/sourceSinks.zh.md +++ b/docs/dev/table/sourceSinks.zh.md @@ -714,10 +714,11 @@ connector.debug=true For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference. -A connector for `MySystem` in our example can extend `ConnectorDescriptor` as shown below: - + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight java %} import org.apache.flink.table.descriptors.ConnectorDescriptor; import java.util.HashMap; @@ -743,9 +744,24 @@ public class MySystemConnector extends ConnectorDescriptor { } } {% endhighlight %} + +The descriptor can then be used to create a table with the table environment. + +{% highlight java %} +StreamTableEnvironment tableEnv = // ... + +tableEnv + .connect(new MySystemConnector(true)) + .withSchema(...) + .inAppendMode() + .createTemporaryTable("MySystemTable"); +{% endhighlight %} + +A custom descriptor can be defined by extending the `ConnectorDescriptor` class. + {% highlight scala %} import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap @@ -763,25 +779,9 @@ class MySystemConnector(isDebug
[flink] branch release-1.10 updated: [FLINK-15061][table]create/alter and table/databases properties should be case sensitive stored in catalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new c35ae05 [FLINK-15061][table]create/alter and table/databases properties should be case sensitive stored in catalog c35ae05 is described below commit c35ae053942860d7bb4677124cb21821db968954 Author: zjuwangg AuthorDate: Mon Dec 9 15:28:39 2019 +0800 [FLINK-15061][table]create/alter and table/databases properties should be case sensitive stored in catalog this closes #10493. --- .../operations/SqlToOperationConverter.java| 11 +++ .../operations/SqlToOperationConverterTest.java| 35 +++--- .../table/sqlexec/SqlToOperationConverter.java | 12 +++- .../table/sqlexec/SqlToOperationConverterTest.java | 35 +++--- 4 files changed, 56 insertions(+), 37 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index fab5fee..bcd3f34 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -172,8 +172,7 @@ public class SqlToOperationConverter { // set with properties Map properties = new HashMap<>(); sqlCreateTable.getPropertyList().getList().forEach(p -> - properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), - ((SqlTableOption) p).getValueString())); + properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); TableSchema tableSchema = createTableSchema(sqlCreateTable); String tableComment = sqlCreateTable.getComment().map(comment -> @@ -223,8 +222,7 @@ public class SqlToOperationConverter { Map properties = new HashMap<>(); properties.putAll(originalCatalogTable.getProperties()); ((SqlAlterTableProperties) sqlAlterTable).getPropertyList().getList().forEach(p -> - properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), - ((SqlTableOption) p).getValueString())); + properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); CatalogTable catalogTable = new CatalogTableImpl( originalCatalogTable.getSchema(), originalCatalogTable.getPartitionKeys(), @@ -386,8 +384,7 @@ public class SqlToOperationConverter { // set with properties Map properties = new HashMap<>(); sqlCreateDatabase.getPropertyList().getList().forEach(p -> - properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), - ((SqlTableOption) p).getValueString())); + properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, databaseComment); return new CreateDatabaseOperation(catalogName, databaseName, catalogDatabase, ignoreIfExists); } @@ -430,7 +427,7 @@ public class SqlToOperationConverter { } // set with properties sqlAlterDatabase.getPropertyList().getList().forEach(p -> - properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), ((SqlTableOption) p).getValueString())); + properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, originCatalogDatabase.getComment()); return new AlterDatabaseOperation(catalogName, databaseName, catalogDatabase); } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 61239fe..c5bdef7 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-bl
[flink] branch master updated (5b09f19 -> ab4c31c)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5b09f19 [FLINK-13577][ml] Add an util class to build result row and generate result schema. add ab4c31c [FLINK-15061][table]create/alter and table/databases properties should be case sensitive stored in catalog No new revisions were added by this update. Summary of changes: .../operations/SqlToOperationConverter.java| 11 +++ .../operations/SqlToOperationConverterTest.java| 35 +++--- .../table/sqlexec/SqlToOperationConverter.java | 12 +++- .../table/sqlexec/SqlToOperationConverterTest.java | 35 +++--- 4 files changed, 56 insertions(+), 37 deletions(-)
[flink] branch master updated (bdb4de7 -> 5b09f19)
This is an automated email from the ASF dual-hosted git repository. rongr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from bdb4de7 [hotfix][doc] update catalog APIs add 5b09f19 [FLINK-13577][ml] Add an util class to build result row and generate result schema. No new revisions were added by this update. Summary of changes: .../flink/ml/common/utils/OutputColsHelper.java| 202 .../ml/common/utils/OutputColsHelperTest.java | 263 + 2 files changed, 465 insertions(+) create mode 100644 flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/utils/OutputColsHelper.java create mode 100644 flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/utils/OutputColsHelperTest.java
[flink] branch release-1.10 updated: [hotfix][doc] update catalog APIs
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new a3d708a [hotfix][doc] update catalog APIs a3d708a is described below commit a3d708a679123f7ad4db91c2a8c68769ad430baa Author: bowen.li AuthorDate: Wed Dec 11 12:01:41 2019 -0800 [hotfix][doc] update catalog APIs this closes #10525. --- docs/dev/table/catalogs.md| 245 ++ docs/dev/table/catalogs.zh.md | 245 ++ 2 files changed, 490 insertions(+) diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md index cfd34b5..3cda99c 100644 --- a/docs/dev/table/catalogs.md +++ b/docs/dev/table/catalogs.md @@ -54,8 +54,253 @@ To use custom catalogs in SQL CLI, users should develop both a catalog and its c The catalog factory defines a set of properties for configuring the catalog when the SQL CLI bootstraps. The set of properties will be passed to a discovery service where the service tries to match the properties to a `CatalogFactory` and initiate a corresponding catalog instance. +## How to Create and Register Flink Tables to Catalog + +### Using SQL DDL + +Users can use SQL DDL to create tables in catalogs in both Table API and SQL. + +For Table API: + + + +{% highlight java %} +TableEnvironment tableEnv = ... + +// Create a HiveCatalog +Catalog catalog = new HiveCatalog("myhive", null, "", ""); + +// Register the catalog +tableEnv.registerCatalog("myhive", catalog); + +// Create a catalog database +tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)"); + +// Create a catalog table +tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)"); + +tableEnv.sqlQuery("SHOW TABLES"); // should see the table + +{% endhighlight %} + + + +For SQL Client: + +{% highlight sql %} +// the catalog should have been registered via yaml file +Flink SQL> CREATE DATABASE mydb WITH (...); + +Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...); + +Flink SQL> SHOW TABLES; +mytable +{% endhighlight %} + +For detailed information, please check out [Flink SQL DDL](({{ site.baseurl }}/dev/table/sql.html#create-table)). + +### Using Java/Scala/Python API + +Users can use Java, Scala, or Python API to create catalog tables programmatically. + + + +{% highlight java %} +TableEnvironment tableEnv = ... + +// Create a HiveCatalog +Catalog catalog = new HiveCatalog("myhive", null, "", ""); + +// Register the catalog +tableEnv.registerCatalog("myhive", catalog); + +// Create a catalog database +catalog.createDatabase("mydb", new CatalogDatabaseImpl(...)) + +// Create a catalog table +TableSchema schema = TableSchema.builder() +.field("name", DataTypes.STRING()) +.field("age", DataTypes.INT()) +.build(); + +catalog.createTable( +new ObjectPath("mydb", "mytable"), +new CatalogTableImpl( +schema, +new Kafka() +.version("0.11") + +.startFromEarlist(), +"my comment" +) +); + +List tables = catalog.listTables("mydb); // tables should contain "mytable" + + + + ## Catalog API +Note: only catalog program APIs are listed here. Users can achieve many of the same funtionalities with SQL DDL. +For detailed DDL information, please refer to [SQL DDL](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#ddl). + + +### Database operations + + + +{% highlight java %} +// create database +catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false); + +// drop database +catalog.dropDatabase("mydb", false); + +// alter database +catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false); + +// get databse +catalog.getDatabase("mydb"); + +// check if a database exist +catalog.databaseExists("mydb"); + +// list databases in a catalog +catalog.listDatabases("mycatalog"); +{% endhighlight %} + + + +### Table operations + + + +{% highlight java %} +// create table +catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); + +// drop table +catalog.dropTable(new ObjectPath("mydb", "mytable"), false); + +// alter table +catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); + +// rename table +catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table"); + +// get table +catalog.getTable("mytable"); + +// check if a table exist or not +catalog.tableExists("mytable"); + +// list tables in a database +catalog.listTables("mydb"); +{% endhighlight %} + + + +### View operations + + + +{% highlight java %} +// create view +catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false); + +// drop view +catalog.dropTable(new ObjectPath("mydb", "myview"), false); + +// alter view +catalog.al
[flink] branch master updated (e40e158 -> bdb4de7)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e40e158 [FLINK-15203][doc] rephrase Hive's data types doc add bdb4de7 [hotfix][doc] update catalog APIs No new revisions were added by this update. Summary of changes: docs/dev/table/catalogs.md| 245 ++ docs/dev/table/catalogs.zh.md | 245 ++ 2 files changed, 490 insertions(+)
[flink] branch master updated (26de396 -> e40e158)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 26de396 [hotfix][doc] update Hive doc add e40e158 [FLINK-15203][doc] rephrase Hive's data types doc No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 23 +-- docs/dev/table/hive/index.zh.md | 19 +-- 2 files changed, 22 insertions(+), 20 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15203][doc] rephrase Hive's data types doc
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new df54cd1 [FLINK-15203][doc] rephrase Hive's data types doc df54cd1 is described below commit df54cd10c4f18904366833132bdeaa33427e8ba4 Author: bowen.li AuthorDate: Wed Dec 11 12:37:26 2019 -0800 [FLINK-15203][doc] rephrase Hive's data types doc this closes #10537. --- docs/dev/table/hive/index.md| 23 +-- docs/dev/table/hive/index.zh.md | 19 +-- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index 9a83882..e242ccf 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -272,6 +272,10 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin DATE +TIMESTAMP +TIMESTAMP + + BYTES BINARY @@ -290,15 +294,14 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin -### Limitations - -The following limitations in Hive's data types impact the mapping between Flink and Hive: -* `CHAR(p)` has a maximum length of 255 -* `VARCHAR(p)` has a maximum length of 65535 +* Hive's `CHAR(p)` has a maximum length of 255 +* Hive's `VARCHAR(p)` has a maximum length of 65535 * Hive's `MAP` only supports primitive key types while Flink's `MAP` can be any data type -* Hive's `UNION` type is not supported -* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type -* Flink's `TIMESTAMP_WITH_TIME_ZONE` and `TIMESTAMP_WITH_LOCAL_TIME_ZONE` are not supported by Hive -* Flink's `TIMESTAMP_WITHOUT_TIME_ZONE` type cannot be mapped to Hive's `TIMESTAMP` type due to precision difference. -* Flink's `MULTISET` is not supported by Hive + + +Note that: + +* Flink doesn't support Hive's `UNION` type is not supported +* Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET` +* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 1136ffe..708e228 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -290,15 +290,14 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin -### Limitations -The following limitations in Hive's data types impact the mapping between Flink and Hive: - -* `CHAR(p)` has a maximum length of 255 -* `VARCHAR(p)` has a maximum length of 65535 +* Hive's `CHAR(p)` has a maximum length of 255 +* Hive's `VARCHAR(p)` has a maximum length of 65535 * Hive's `MAP` only supports primitive key types while Flink's `MAP` can be any data type -* Hive's `UNION` type is not supported -* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type -* Flink's `TIMESTAMP_WITH_TIME_ZONE` and `TIMESTAMP_WITH_LOCAL_TIME_ZONE` are not supported by Hive -* Flink's `TIMESTAMP_WITHOUT_TIME_ZONE` type cannot be mapped to Hive's `TIMESTAMP` type due to precision difference. -* Flink's `MULTISET` is not supported by Hive + + +Note that: + +* Flink doesn't support Hive's `UNION` type is not supported +* Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET` +* Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
[flink] branch master updated (940613b -> 26de396)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 940613b [hotfix][doc] update and remove limitations of Hive connector add 26de396 [hotfix][doc] update Hive doc No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 17 - docs/dev/table/hive/index.zh.md | 17 - 2 files changed, 24 insertions(+), 10 deletions(-)
[flink] branch release-1.10 updated: [hotfix][doc] update and remove limitations of Hive connector
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 393bf85 [hotfix][doc] update and remove limitations of Hive connector 393bf85 is described below commit 393bf85ba6e8ebaeb94f83627dd2410d0215a7aa Author: bowen.li AuthorDate: Wed Dec 11 12:51:05 2019 -0800 [hotfix][doc] update and remove limitations of Hive connector --- docs/dev/table/hive/read_write_hive.md| 21 +++-- docs/dev/table/hive/read_write_hive.zh.md | 21 +++-- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/read_write_hive.md index 4a48060..9814ceb 100644 --- a/docs/dev/table/hive/read_write_hive.md +++ b/docs/dev/table/hive/read_write_hive.md @@ -24,7 +24,6 @@ under the License. Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. - * This will be replaced by the TOC {:toc} @@ -118,14 +117,16 @@ Similarly, data can be written into hive using an `INSERT INTO` clause. Flink SQL> INSERT INTO mytable (name, value) VALUES ('Tom', 4.72); {% endhighlight %} -### Limitations +## Formats + +We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet. + +## Roadmap + +We are planning and actively working on supporting features like -The following is a list of major limitations of the Hive connector. And we're actively working to close these gaps. +- ACID tables +- bucketed tables +- more formats -1. INSERT OVERWRITE is not supported. -2. Inserting into partitioned tables is not supported. -3. ACID tables are not supported. -4. Bucketed tables are not supported. -5. Some data types are not supported. See the [limitations]({{ site.baseurl }}/dev/table/hive/#limitations) for details. -6. Only a limited number of table storage formats have been tested, namely text, SequenceFile, ORC, and Parquet. -7. Views are not supported. +Please reach out to the community for more feature request https://flink.apache.org/community.html#mailing-lists diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/read_write_hive.zh.md index 9b9bf77..9814ceb 100644 --- a/docs/dev/table/hive/read_write_hive.zh.md +++ b/docs/dev/table/hive/read_write_hive.zh.md @@ -24,7 +24,6 @@ under the License. Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. - * This will be replaced by the TOC {:toc} @@ -118,14 +117,16 @@ Similarly, data can be written into hive using an `INSERT INTO` clause. Flink SQL> INSERT INTO mytable (name, value) VALUES ('Tom', 4.72); {% endhighlight %} -### Limitations +## Formats + +We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet. + +## Roadmap + +We are planning and actively working on supporting features like -The following is a list of major limitations of the Hive connector. And we're actively working to close these gaps. +- ACID tables +- bucketed tables +- more formats -1. INSERT OVERWRITE is not supported. -2. Inserting into partitioned tables is not supported. -3. ACID tables are not supported. -4. Bucketed tables are not supported. -5. Some data types are not supported. See the [limitations]({{ site.baseurl }}/zh/dev/table/hive/#limitations) for details. -6. Only a limited number of table storage formats have been tested, namely text, SequenceFile, ORC, and Parquet. -7. Views are not supported. +Please reach out to the community for more feature request https://flink.apache.org/community.html#mailing-lists
[flink] branch master updated (9277e9e -> 940613b)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9277e9e [FLINK-15199][rocksdb] Make RocksDBResourceContainer public for the benchmarks add 940613b [hotfix][doc] update and remove limitations of Hive connector No new revisions were added by this update. Summary of changes: docs/dev/table/hive/read_write_hive.md| 21 +++-- docs/dev/table/hive/read_write_hive.zh.md | 21 +++-- 2 files changed, 22 insertions(+), 20 deletions(-)
[flink] branch release-1.10 updated: [hotfix][doc] update Hive doc
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 26b8002 [hotfix][doc] update Hive doc 26b8002 is described below commit 26b8002c0a90d9cb452a2700f58b221a3f0c05c4 Author: bowen.li AuthorDate: Tue Dec 10 20:03:46 2019 -0800 [hotfix][doc] update Hive doc --- docs/dev/table/hive/index.md| 17 - docs/dev/table/hive/index.zh.md | 17 - 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index cc0c716..9a83882 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -75,7 +75,7 @@ Flink supports the following Hive versions. ### Dependencies -To integrate with Hive, users need the following dependencies in their project. +To integrate with Hive, users need some dependencies in their project. We are using Hive 2.3.4 and 1.2.1 as examples here. @@ -96,7 +96,9 @@ To integrate with Hive, users need the following dependencies in their project. provided - + org.apache.flink @@ -132,7 +134,9 @@ To integrate with Hive, users need the following dependencies in their project. provided - + org.apache.flink @@ -167,13 +171,16 @@ To integrate with Hive, users need the following dependencies in their project. Connect to an existing Hive installation using the Hive [Catalog]({{ site.baseurl }}/dev/table/catalogs.html) through the table environment or YAML configuration. +If the `hive-conf/hive-site.xml` file is stored in remote storage system, users should download +the hive configuration file to their local environment first. + {% highlight java %} String name= "myhive"; String defaultDatabase = "mydatabase"; -String hiveConfDir = "/opt/hive-conf"; +String hiveConfDir = "/opt/hive-conf"; // a local path String version = "2.3.4"; // or 1.2.1 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); @@ -185,7 +192,7 @@ tableEnv.registerCatalog("myhive", hive); val name= "myhive" val defaultDatabase = "mydatabase" -val hiveConfDir = "/opt/hive-conf" +val hiveConfDir = "/opt/hive-conf" // a local path val version = "2.3.4" // or 1.2.1 val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version) diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 205ecfe..1136ffe 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -75,7 +75,7 @@ Flink supports the following Hive versions. ### Dependencies -To integrate with Hive, users need the following dependencies in their project. +To integrate with Hive, users need some dependencies in their project. We are using Hive 2.3.4 and 1.2.1 as examples here. @@ -96,7 +96,9 @@ To integrate with Hive, users need the following dependencies in their project. provided - + org.apache.flink @@ -132,7 +134,9 @@ To integrate with Hive, users need the following dependencies in their project. provided - + org.apache.flink @@ -167,13 +171,16 @@ To integrate with Hive, users need the following dependencies in their project. Connect to an existing Hive installation using the Hive [Catalog]({{ site.baseurl }}/dev/table/catalogs.html) through the table environment or YAML configuration. +If the `hive-conf/hive-site.xml` file is stored in remote storage system, users should download +the hive configuration file to their local environment first. + {% highlight java %} String name= "myhive"; String defaultDatabase = "mydatabase"; -String hiveConfDir = "/opt/hive-conf"; +String hiveConfDir = "/opt/hive-conf"; // a local path String version = "2.3.4"; // or 1.2.1 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); @@ -185,7 +192,7 @@ tableEnv.registerCatalog("myhive", hive); val name= "myhive" val defaultDatabase = "mydatabase" -val hiveConfDir = "/opt/hive-conf" +val hiveConfDir = "/opt/hive-conf" // a local path val version = "2.3.4" // or 1.2.1 val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
[flink] branch master updated (04ab225 -> 9277e9e)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 04ab225 [FLINK-15069][benchmark] Supplement the pipelined shuffle compression case for benchmark add 9277e9e [FLINK-15199][rocksdb] Make RocksDBResourceContainer public for the benchmarks No new revisions were added by this update. Summary of changes: .../flink/contrib/streaming/state/RocksDBResourceContainer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (07f08fb -> 04ab225)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 07f08fb [FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for RocksDBOptionsFactory add 04ab225 [FLINK-15069][benchmark] Supplement the pipelined shuffle compression case for benchmark No new revisions were added by this update. Summary of changes: ...a => StreamNetworkCompressionThroughputBenchmark.java} | 15 ++- ... StreamNetworkCompressionThroughputBenchmarkTest.java} | 6 +++--- 2 files changed, 13 insertions(+), 8 deletions(-) copy flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/{StreamNetworkBroadcastThroughputBenchmark.java => StreamNetworkCompressionThroughputBenchmark.java} (74%) copy flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/{StreamNetworkBroadcastThroughputBenchmarkTest.java => StreamNetworkCompressionThroughputBenchmarkTest.java} (83%)
[flink] 03/04: [hotfix][kubernetes] Separate creating and retrieving flink cluster logs
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit d61ef1c3f02068d8fb7a23dd4d71db643c0491f1 Author: wangyang0918 AuthorDate: Mon Dec 9 20:40:46 2019 +0800 [hotfix][kubernetes] Separate creating and retrieving flink cluster logs Currently, when we create a flink cluster on kubernetes, two retrieving logs and one creating log will show up. It is so confusing. The logs of creating and retrieving a flink cluster should be separated. --- .../kubernetes/KubernetesClusterDescriptor.java| 40 ++ 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java index ff04a59..9d11954 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -91,14 +91,7 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor { } try { - - RestClusterClient resultClient = new RestClusterClient<>( - configuration, clusterId); - LOG.info( - "Succesfully retrieved cluster client for cluster {}, JobManager Web Interface : {}", - clusterId, - resultClient.getWebInterfaceURL()); - return resultClient; + return new RestClusterClient<>(configuration, clusterId); } catch (Exception e) { client.handleException(e); throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); @@ -108,17 +101,31 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor { @Override public ClusterClientProvider retrieve(String clusterId) { - return createClusterClientProvider(clusterId); + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; } @Override public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { - final ClusterClientProvider clusterClient = deployClusterInternal( + final ClusterClientProvider clusterClientProvider = deployClusterInternal( KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); - return clusterClient; + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; } @Override @@ -180,16 +187,7 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor { client.createConfigMap(); client.createFlinkMasterDeployment(clusterSpecification); - ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); - - try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { - LOG.info( - "Create flink session cluster {} successfully, JobManager Web Interface: {}", - clusterId, - clusterClient.getWebInterfaceURL()); - } - - return clusterClientProvider; + return createClusterClientProvider(clusterId); } catch (Exception e) { client.handleException(e); throw new ClusterDeploymentException("Coul
[flink] 01/04: [hotfix][kubernetes] Remove per-job related config options
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit ef4ad4491980a59a1722af9ba61bd09f4dbdfd53 Author: wangyang0918 AuthorDate: Mon Dec 9 18:06:33 2019 +0800 [hotfix][kubernetes] Remove per-job related config options --- .../kubernetes/cli/FlinkKubernetesCustomCli.java | 28 -- .../flink/kubernetes/cli/KubernetesCliOptions.java | 16 - 2 files changed, 44 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java index 620abb1..9176e9a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java @@ -33,7 +33,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; @@ -57,8 +56,6 @@ import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.CLUSTER_ID_OP import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.DYNAMIC_PROPERTY_OPTION; import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.HELP_OPTION; import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.IMAGE_OPTION; -import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.JOB_CLASS_NAME_OPTION; -import static org.apache.flink.kubernetes.cli.KubernetesCliOptions.JOB_ID_OPTION; /** * Kubernetes customized commandline. @@ -85,9 +82,6 @@ public class FlinkKubernetesCustomCli extends AbstractCustomCommandLine { private final Option dynamicPropertiesOption; private final Option helpOption; - private final Option jobClassOption; - private final Option jobIdOption; - private final ClusterClientServiceLoader clusterClientServiceLoader; public FlinkKubernetesCustomCli(Configuration configuration, String shortPrefix, String longPrefix) { @@ -115,9 +109,6 @@ public class FlinkKubernetesCustomCli extends AbstractCustomCommandLine { this.taskManagerSlotsOption = KubernetesCliOptions.getOptionWithPrefix( KubernetesCliOptions.TASK_MANAGER_SLOTS_OPTION, shortPrefix, longPrefix); - this.jobClassOption = KubernetesCliOptions.getOptionWithPrefix(JOB_CLASS_NAME_OPTION, shortPrefix, longPrefix); - this.jobIdOption = KubernetesCliOptions.getOptionWithPrefix(JOB_ID_OPTION, shortPrefix, longPrefix); - this.helpOption = KubernetesCliOptions.getOptionWithPrefix(HELP_OPTION, shortPrefix, longPrefix); } @@ -197,25 +188,6 @@ public class FlinkKubernetesCustomCli extends AbstractCustomCommandLine { effectiveConfiguration.setString(key, dynamicProperties.getProperty(key)); } - final StringBuilder entryPointClassArgs = new StringBuilder(); - if (commandLine.hasOption(jobClassOption.getOpt())) { - entryPointClassArgs.append(" --") - .append(jobClassOption.getLongOpt()) - .append(" ") - .append(commandLine.getOptionValue(jobClassOption.getOpt())); - } - - if (commandLine.hasOption(jobIdOption.getOpt())) { - entryPointClassArgs.append(" --") - .append(jobIdOption.getLongOpt()) - .append(" ") - .append(commandLine.getOptionValue(jobIdOption.getOpt())); - } - if (!entryPointClassArgs.toString().isEmpty()) { - effectiveConfiguration.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS_ARGS, - entryPointClassArgs.toString()); - } - return effectiveConfiguration; } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java index 48153bc..b71eb00 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java @@ -91,22 +91,6
[flink] 04/04: [FLINK-15153][kubernetes] Service selector needs to contain jobmanager component label
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 8f7ad848964c00bee490be0a4ea6d715426179d8 Author: wangyang0918 AuthorDate: Mon Dec 9 18:08:03 2019 +0800 [FLINK-15153][kubernetes] Service selector needs to contain jobmanager component label The jobmanager label needs to be added to service selector. Otherwise, it may select the wrong backend pods(taskmanager). --- .../flink/kubernetes/kubeclient/decorators/ServiceDecorator.java | 9 - .../apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java| 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java index 98c4634..f8c4493 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ServiceDecorator.java @@ -33,6 +33,7 @@ import io.fabric8.kubernetes.api.model.ServiceSpec; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Setup services port. @@ -89,7 +90,13 @@ public class ServiceDecorator extends Decorator { } spec.setPorts(servicePorts); - spec.setSelector(resource.getMetadata().getLabels()); + + final Map labels = new LabelBuilder() + .withExist(resource.getMetadata().getLabels()) + .withJobManagerComponent() + .toLabels(); + + spec.setSelector(labels); resource.setSpec(spec); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java index 17b52c7..08ebedf 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java @@ -106,6 +106,8 @@ public class Fabric8ClientTest extends KubernetesTestBase { assertEquals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString(), service.getSpec().getType()); + // The selector labels should contain jobmanager component + labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); assertEquals(labels, service.getSpec().getSelector()); assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()), @@ -133,6 +135,7 @@ public class Fabric8ClientTest extends KubernetesTestBase { assertEquals(KubernetesConfigOptions.ServiceExposedType.LoadBalancer.toString(), service.getSpec().getType()); + labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); assertEquals(labels, service.getSpec().getSelector()); assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),
[flink] 02/04: [hotfix][kubernetes] Correct the description of cli options
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit c1651914a562eb58ad92bc7931d9110ae4b7b05f Author: wangyang0918 AuthorDate: Tue Dec 10 14:39:02 2019 +0800 [hotfix][kubernetes] Correct the description of cli options Currently, some cli options descriptions could not show correctly. It is just because KubernetesConfigOptions.CONTAINER_IMAGE.description().toString() is used. The Description#toString() does not represent the real description content. --- docs/_includes/generated/kubernetes_config_configuration.html | 2 +- .../java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java | 7 +++ .../flink/kubernetes/configuration/KubernetesConfigOptions.java| 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/docs/_includes/generated/kubernetes_config_configuration.html b/docs/_includes/generated/kubernetes_config_configuration.html index 932a21e..192b0d6 100644 --- a/docs/_includes/generated/kubernetes_config_configuration.html +++ b/docs/_includes/generated/kubernetes_config_configuration.html @@ -12,7 +12,7 @@ kubernetes.cluster-id (none) String -The cluster id that will be used for flink cluster. If it's not set, the client will generate a random UUID name. +The cluster id used for identifying the unique flink cluster. If it's not set, the client will generate a random UUID name. kubernetes.config.file diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java index b71eb00..c95e04c 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesCliOptions.java @@ -18,8 +18,6 @@ package org.apache.flink.kubernetes.cli; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; - import org.apache.commons.cli.Option; /** @@ -46,7 +44,8 @@ public class KubernetesCliOptions { .longOpt("clusterId") .required(false) .hasArg(true) - .desc(KubernetesConfigOptions.CLUSTER_ID.description().toString()) + .desc("The cluster id used for identifying the unique flink cluster. If it's not set, the client will generate " + + "a random UUID name.") .build(); public static final Option IMAGE_OPTION = Option.builder("i") @@ -54,7 +53,7 @@ public class KubernetesCliOptions { .required(false) .hasArg(true) .argName("image-name") - .desc(KubernetesConfigOptions.CONTAINER_IMAGE.description().toString()) + .desc("Image to use for Flink containers") .build(); public static final Option JOB_MANAGER_MEMORY_OPTION = Option.builder("jm") diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index d65d4fb..6136d5e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -97,7 +97,7 @@ public class KubernetesConfigOptions { key("kubernetes.cluster-id") .stringType() .noDefaultValue() - .withDescription("The cluster id that will be used for flink cluster. If it's not set, " + + .withDescription("The cluster id used for identifying the unique flink cluster. If it's not set, " + "the client will generate a random UUID name."); public static final ConfigOption CONTAINER_IMAGE =
[flink] branch release-1.10 updated (97899cf -> 8f7ad84)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 97899cf [FLINK-15008][build] Bundle javax.activation-api for Java 11 new ef4ad44 [hotfix][kubernetes] Remove per-job related config options new c165191 [hotfix][kubernetes] Correct the description of cli options new d61ef1c [hotfix][kubernetes] Separate creating and retrieving flink cluster logs new 8f7ad84 [FLINK-15153][kubernetes] Service selector needs to contain jobmanager component label The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../generated/kubernetes_config_configuration.html | 2 +- .../kubernetes/KubernetesClusterDescriptor.java| 40 ++ .../kubernetes/cli/FlinkKubernetesCustomCli.java | 28 --- .../flink/kubernetes/cli/KubernetesCliOptions.java | 23 ++--- .../configuration/KubernetesConfigOptions.java | 2 +- .../kubeclient/decorators/ServiceDecorator.java| 9 - .../kubernetes/kubeclient/Fabric8ClientTest.java | 3 ++ 7 files changed, 35 insertions(+), 72 deletions(-)
[flink] 01/11: [FLINK-14926][state-backend-rocksdb] Ensure that RocksObjects are always closed on backend disposal
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e05618b427f22cb73694658f5a21fb1767709efa Author: Yu Li AuthorDate: Sun Nov 24 16:20:09 2019 +0800 [FLINK-14926][state-backend-rocksdb] Ensure that RocksObjects are always closed on backend disposal This also ensures that the newly introduces shared resources (shared cache and write buffers) are properly closed when the RocksDB state backend is disposed. This closes #10300 --- .../state/DefaultConfigurableOptionsFactory.java | 5 +- .../contrib/streaming/state/OptionsFactory.java| 30 - .../contrib/streaming/state/PredefinedOptions.java | 99 .../streaming/state/RocksDBKeyedStateBackend.java | 11 +- .../state/RocksDBKeyedStateBackendBuilder.java | 23 ++-- .../streaming/state/RocksDBResourceContainer.java | 122 .../streaming/state/RocksDBStateBackend.java | 62 +- .../contrib/streaming/state/RocksDBResource.java | 20 ++-- .../state/RocksDBResourceContainerTest.java| 125 + .../state/RocksDBStateBackendConfigTest.java | 19 ++-- .../streaming/state/RocksDBStateBackendTest.java | 15 +-- .../contrib/streaming/state/RocksDBTestUtils.java | 13 +-- 12 files changed, 442 insertions(+), 102 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java index 962a12e..12c0eb9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java @@ -31,6 +31,7 @@ import org.rocksdb.PlainTableConfig; import org.rocksdb.TableFormatConfig; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,7 +60,7 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac private final Map configuredOptions = new HashMap<>(); @Override - public DBOptions createDBOptions(DBOptions currentOptions) { + public DBOptions createDBOptions(DBOptions currentOptions, Collection handlesToClose) { if (isOptionConfigured(MAX_BACKGROUND_THREADS)) { currentOptions.setIncreaseParallelism(getMaxBackgroundThreads()); } @@ -72,7 +73,7 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac } @Override - public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection handlesToClose) { if (isOptionConfigured(COMPACTION_STYLE)) { currentOptions.setCompactionStyle(getCompactionStyle()); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java index 3204580..79815b9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java @@ -21,6 +21,8 @@ package org.apache.flink.contrib.streaming.state; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import java.util.Collection; + /** * A factory for {@link DBOptions} and {@link ColumnFamilyOptions} to be passed to the {@link RocksDBStateBackend}. * Options have to be created lazily by this factory, because the {@code Options} @@ -52,9 +54,21 @@ public interface OptionsFactory extends java.io.Serializable { * the setter methods, otherwise the pre-defined options may get lost. * * @param currentOptions The options object with the pre-defined options. +* @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s. +* @return The options object on which the additional options are set. +*/ + DBOptions createDBOptions(DBOptions currentOptions, Collection handlesToClose); + + /** +* Set the additional options on top of the current options object. +* +* @param currentOptions The options object with the pre-defined option
[flink] 04/11: [hotfix][tests] Ensure RocksDB native library is loaded into temp directory
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3820bdd364a86f2acebe54577a36c01499dfe326 Author: Stephan Ewen AuthorDate: Mon Dec 9 19:09:26 2019 +0100 [hotfix][tests] Ensure RocksDB native library is loaded into temp directory This moves the loading code from one specific test method into a the initialization phase. That way the extraction happens into the target location also in cases where other test methods execute first. --- .../streaming/state/RocksDBResourceContainerTest.java| 16 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java index f72dd1a..5ec3c04 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java @@ -21,7 +21,8 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.util.function.ThrowingRunnable; -import org.junit.Rule; +import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.rocksdb.ColumnFamilyOptions; @@ -30,6 +31,7 @@ import org.rocksdb.LRUCache; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.WriteBufferManager; +import java.io.IOException; import java.util.ArrayList; import static org.hamcrest.CoreMatchers.is; @@ -40,8 +42,15 @@ import static org.hamcrest.MatcherAssert.assertThat; */ public class RocksDBResourceContainerTest { - @Rule - public final TemporaryFolder tmp = new TemporaryFolder(); + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + @BeforeClass + public static void ensureRocksDbNativeLibraryLoaded() throws IOException { + NativeLibraryLoader.getInstance().loadLibrary(TMP_FOLDER.newFolder().getAbsolutePath()); + } + + // @Test public void testFreeDBOptionsAfterClose() throws Exception { @@ -108,7 +117,6 @@ public class RocksDBResourceContainerTest { @Test public void testFreeSharedResourcesAfterClose() throws Exception { - NativeLibraryLoader.getInstance().loadLibrary(tmp.newFolder().getAbsolutePath()); RocksDBResourceContainer container = new RocksDBResourceContainer(); LRUCache cache = new LRUCache(1024L); WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
[flink] 08/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on RocksDBResourceContainer for cleanup of native handles
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 93deb1a709840c2ae8957ea88cd8ffc0929951ad Author: Stephan Ewen AuthorDate: Mon Dec 9 20:22:08 2019 +0100 [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on RocksDBResourceContainer for cleanup of native handles --- .../contrib/streaming/state/RocksDBResourceContainer.java| 6 ++ .../flink/contrib/streaming/state/RocksDBStateBackend.java | 12 +--- .../contrib/streaming/state/RocksDBStateBackendTest.java | 12 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 32c198f..20c3c5c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -106,6 +106,12 @@ final class RocksDBResourceContainer implements AutoCloseable { return opt; } + RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) { + return optionsFactory == null + ? defaultMetricOptions + : optionsFactory.createNativeMetricsOptions(defaultMetricOptions); + } + @Nullable OptionsFactory getOptionsFactory() { return optionsFactory; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 829566d..3ffb021 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -560,7 +560,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) .setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled()) .setNumberOfTransferingThreads(getNumberOfTransferThreads()) - .setNativeMetricOptions(getMemoryWatcherOptions(resourceContainer)); + .setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(defaultMetricOptions)); return builder.build(); } @@ -853,16 +853,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu return resourceContainer.getColumnOptions(); } - public RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBResourceContainer resourceContainer) { - RocksDBNativeMetricOptions options = this.defaultMetricOptions; - OptionsFactory optionsFactory = resourceContainer.getOptionsFactory(); - if (optionsFactory != null) { - options = optionsFactory.createNativeMetricsOptions(options); - } - - return options; - } - /** * Gets the number of threads used to transfer files while snapshotting/restoring. */ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index e59f6e1..b31e190 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -115,14 +115,12 @@ public class RocksDBStateBackendTest extends StateBackendTestBase handlesToClose = new ArrayList<>(); + private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); public void prepareRocksDB() throws Exception { String dbPath = new File(tempFolder.newFolder(), DB_INSTANCE_DIR_STRING).getAbsolutePath(); - columnOptions = PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose); - optionsContainer = new RocksDBResourceContainer(); + ColumnFamil
[flink] 03/11: [hotfix][tests Use assumptions rather than manual checks in RocksDBStateBackendConfigTest to report skipped tests properly
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 08554b9b1733fc148397c9179681f54a6cb24ef0 Author: Stephan Ewen AuthorDate: Mon Dec 9 20:32:13 2019 +0100 [hotfix][tests Use assumptions rather than manual checks in RocksDBStateBackendConfigTest to report skipped tests properly --- .../state/RocksDBStateBackendConfigTest.java| 21 +++-- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index b106000..bd2a34a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.IOUtils; import org.junit.Assert; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -311,16 +312,13 @@ public class RocksDBStateBackendConfigTest { @Test public void testFailWhenNoLocalStorageDir() throws Exception { + final File targetDir = tempFolder.newFolder(); + Assume.assumeTrue("Cannot mark directory non-writable", targetDir.setWritable(false, false)); + String checkpointPath = tempFolder.newFolder().toURI().toString(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); - File targetDir = tempFolder.newFolder(); try { - if (!targetDir.setWritable(false, false)) { - System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable"); - return; - } - rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); boolean hasFailure = false; @@ -354,19 +352,14 @@ public class RocksDBStateBackendConfigTest { @Test public void testContinueOnSomeDbDirectoriesMissing() throws Exception { - File targetDir1 = tempFolder.newFolder(); - File targetDir2 = tempFolder.newFolder(); + final File targetDir1 = tempFolder.newFolder(); + final File targetDir2 = tempFolder.newFolder(); + Assume.assumeTrue("Cannot mark directory non-writable", targetDir1.setWritable(false, false)); String checkpointPath = tempFolder.newFolder().toURI().toString(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); try { - - if (!targetDir1.setWritable(false, false)) { - System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable"); - return; - } - rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath()); try {
[flink] 07/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Make RocksDBResourceContainer immutable
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0602ccf1d23c41bdbf335f6fb3e4cc39d7d5d7d1 Author: Stephan Ewen AuthorDate: Mon Dec 9 19:42:00 2019 +0100 [FLINK-14926][state-backend-rocksdb] (follow-up) Make RocksDBResourceContainer immutable --- .../streaming/state/RocksDBResourceContainer.java | 50 +++--- .../streaming/state/RocksDBStateBackend.java | 31 +++--- .../state/RocksDBResourceContainerTest.java| 7 ++- 3 files changed, 43 insertions(+), 45 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 4cb18d8..32c198f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -24,11 +24,12 @@ import org.apache.flink.util.IOUtils; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The container for RocksDB resources, including predefined options, option factory and * shared resource among instances. @@ -39,22 +40,37 @@ import java.util.ArrayList; final class RocksDBResourceContainer implements AutoCloseable { /** The pre-configured option settings. */ - private PredefinedOptions predefinedOptions; + private final PredefinedOptions predefinedOptions; /** The options factory to create the RocksDB options. */ @Nullable - private OptionsFactory optionsFactory; + private final OptionsFactory optionsFactory; /** The shared resource among RocksDB instances. This resource is not part of the 'handlesToClose', * because the handles to close are closed quietly, whereas for this one, we want exceptions to be reported. */ @Nullable - private OpaqueMemoryResource sharedResources; + private final OpaqueMemoryResource sharedResources; /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; public RocksDBResourceContainer() { - handlesToClose = new ArrayList<>(); + this(PredefinedOptions.DEFAULT, null, null); + } + + public RocksDBResourceContainer(PredefinedOptions predefinedOptions, @Nullable OptionsFactory optionsFactory) { + this(predefinedOptions, optionsFactory, null); + } + + public RocksDBResourceContainer( + PredefinedOptions predefinedOptions, + @Nullable OptionsFactory optionsFactory, + @Nullable OpaqueMemoryResource sharedResources) { + + this.predefinedOptions = checkNotNull(predefinedOptions); + this.optionsFactory = optionsFactory; + this.sharedResources = sharedResources; + this.handlesToClose = new ArrayList<>(); } /** @@ -62,7 +78,7 @@ final class RocksDBResourceContainer implements AutoCloseable { */ DBOptions getDbOptions() { // initial options from pre-defined profile - DBOptions opt = checkAndGetPredefinedOptions().createDBOptions(handlesToClose); + DBOptions opt = predefinedOptions.createDBOptions(handlesToClose); // add user-defined options factory, if specified if (optionsFactory != null) { @@ -80,7 +96,7 @@ final class RocksDBResourceContainer implements AutoCloseable { */ ColumnFamilyOptions getColumnOptions() { // initial options from pre-defined profile - ColumnFamilyOptions opt = checkAndGetPredefinedOptions().createColumnOptions(handlesToClose); + ColumnFamilyOptions opt = predefinedOptions.createColumnOptions(handlesToClose); // add user-defined options, if specified if (optionsFactory != null) { @@ -90,29 +106,11 @@ final class RocksDBResourceContainer implements AutoCloseable { return opt; } - PredefinedOptions checkAndGetPredefinedOptions() { - if (predefinedOptions == null) { - predefinedOptions = PredefinedOptions.DEFAULT; - } - return predefinedOptions; - } - + @Nullable OptionsFactory getOptionsFactory() { return optionsFactory; } - void setPredefinedOpt
[flink] 10/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify collecton the option objects to close
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 2405111bd62595e728669ad7753b1aa0be960ae6 Author: Stephan Ewen AuthorDate: Tue Dec 10 11:52:59 2019 +0100 [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify collecton the option objects to close - rather than letting each option factory method add the option to the list itself, add it in one place in the Resource Container. - assume the list passed to the factory methods is always non-null --- .../contrib/streaming/state/PredefinedOptions.java | 63 +- .../streaming/state/RocksDBResourceContainer.java | 2 + 2 files changed, 15 insertions(+), 50 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java index a629756..47c7d12 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -25,6 +25,7 @@ import org.rocksdb.CompactionStyle; import org.rocksdb.DBOptions; import org.rocksdb.InfoLogLevel; +import java.util.ArrayList; import java.util.Collection; /** @@ -58,24 +59,15 @@ public enum PredefinedOptions { @Override public DBOptions createDBOptions(Collection handlesToClose) { - DBOptions dbOptions = - new DBOptions() + return new DBOptions() .setUseFsync(false) .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) .setStatsDumpPeriodSec(0); - if (handlesToClose != null) { - handlesToClose.add(dbOptions); - } - return dbOptions; } @Override public ColumnFamilyOptions createColumnOptions(Collection handlesToClose) { - ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); - if (handlesToClose != null) { - handlesToClose.add(columnFamilyOptions); - } - return columnFamilyOptions; + return new ColumnFamilyOptions(); } }, @@ -106,29 +98,19 @@ public enum PredefinedOptions { @Override public DBOptions createDBOptions(Collection handlesToClose) { - DBOptions dbOptions = - new DBOptions() + return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) .setStatsDumpPeriodSec(0); - if (handlesToClose != null) { - handlesToClose.add(dbOptions); - } - return dbOptions; } @Override public ColumnFamilyOptions createColumnOptions(Collection handlesToClose) { - ColumnFamilyOptions columnFamilyOptions = - new ColumnFamilyOptions() + return new ColumnFamilyOptions() .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true); - if (handlesToClose != null) { - handlesToClose.add(columnFamilyOptions); - } - return columnFamilyOptions; } }, @@ -164,18 +146,12 @@ public enum PredefinedOptions { @Override public DBOptions createDBOptions(Collection handlesToClose) { - - DBOptions dbOptions = - new DBOptions() + return new DBOptions() .setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) .setStatsDumpPeriodSec(0); - if (han
[flink] 11/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for RocksDBOptionsFactory
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 07f08fb19a814cbac03b92e675fe567e7a62e510 Author: Stephan Ewen AuthorDate: Tue Dec 10 13:29:05 2019 +0100 [FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for RocksDBOptionsFactory OptionsFactory was breaking existing implementations by adding a method to the interface without default method. To solve this, we keep OptionsFactory but replace it in the RocksDB State Backend with the RocksDBOptionsFactory which has the evolved signature. The OptionsFactory is still accepted and wrapped into a RocksDBOptionsFactory, for compatibility. --- .../state/ConfigurableOptionsFactory.java | 2 +- ...java => ConfigurableRocksDBOptionsFactory.java} | 4 +- .../state/DefaultConfigurableOptionsFactory.java | 6 +- .../contrib/streaming/state/OptionsFactory.java| 51 ++--- .../state/RocksDBConfigurableOptions.java | 4 +- ...ionsFactory.java => RocksDBOptionsFactory.java} | 63 +--- .../state/RocksDBOptionsFactoryAdapter.java| 75 ++ .../streaming/state/RocksDBResourceContainer.java | 8 +- .../streaming/state/RocksDBStateBackend.java | 69 +++-- .../RocksDBOptionsFactoryCompatibilityTest.java| 88 ++ .../contrib/streaming/state/RocksDBResource.java | 6 +- .../state/RocksDBStateBackendConfigTest.java | 13 ++-- 12 files changed, 273 insertions(+), 116 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java index 80d1a1f..343eda8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java @@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.Configuration; /** - * An interface for options factory that pick up additional parameters from a configuration. + * @deprecated Replaced by {@link ConfigurableRocksDBOptionsFactory}. */ public interface ConfigurableOptionsFactory extends OptionsFactory { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java similarity index 91% copy from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java copy to flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java index 80d1a1f..ac7b882 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java @@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration; /** * An interface for options factory that pick up additional parameters from a configuration. */ -public interface ConfigurableOptionsFactory extends OptionsFactory { +public interface ConfigurableRocksDBOptionsFactory extends RocksDBOptionsFactory { /** * Creates a variant of the options factory that applies additional configuration parameters. @@ -35,5 +35,5 @@ public interface ConfigurableOptionsFactory extends OptionsFactory { * @param configuration The configuration to pick the values from. * @return A reconfigured options factory. */ - OptionsFactory configure(Configuration configuration); + RocksDBOptionsFactory configure(Configuration configuration); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java index 93753b2..aaa9e83 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java @@ -51,10 +51,10 @@ import sta
[flink] 09/11: [FLINK-14926][state-backend-rocksdb] (follow-up) Avoid exposing handles that will not be closed from RocksDBStateBackend
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 31dd8c36246c86db3fb07790158a8c9fe373b726 Author: Stephan Ewen AuthorDate: Tue Dec 10 11:17:47 2019 +0100 [FLINK-14926][state-backend-rocksdb] (follow-up) Avoid exposing handles that will not be closed from RocksDBStateBackend That means that RocksDBStateBackend should not have accessors to gather created native handles. The native handles should only be created once the Resource Container is created. This implies removing tome test methods from RocksDBStateBackend and changing the tests to test against the RocksDBResourceContainer instead. --- .../streaming/state/RocksDBResourceContainer.java | 4 ++ .../streaming/state/RocksDBStateBackend.java | 48 +++--- .../state/RocksDBStateBackendConfigTest.java | 59 +- 3 files changed, 46 insertions(+), 65 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 20c3c5c..199cd84 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -112,6 +112,10 @@ final class RocksDBResourceContainer implements AutoCloseable { : optionsFactory.createNativeMetricsOptions(defaultMetricOptions); } + PredefinedOptions getPredefinedOptions() { + return predefinedOptions; + } + @Nullable OptionsFactory getOptionsFactory() { return optionsFactory; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 3ffb021..b572deb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -131,9 +131,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private File[] localRocksDbDirectories; /** The pre-configured option settings. */ + @Nullable private PredefinedOptions predefinedOptions; /** The options factory to create the RocksDB options in the cluster. */ + @Nullable private OptionsFactory optionsFactory; /** This determines if incremental checkpointing is enabled. */ @@ -505,8 +507,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu final OpaqueMemoryResource sharedResources = RocksDBOperationUtils .allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), LOG); - final RocksDBResourceContainer resourceContainer = new RocksDBResourceContainer( - getConfiguredPredefinedOptionsOrDefault(), optionsFactory, sharedResources); + final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer(sharedResources); final DBOptions dbOptions = resourceContainer.getDbOptions(); final Function createColumnOptions; @@ -826,34 +827,12 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu * * @return The options factory. */ - @VisibleForTesting + @Nullable public OptionsFactory getOptions() { return optionsFactory; } /** -* Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances. Only for testing. -*/ - @VisibleForTesting - public DBOptions getDbOptions() { - RocksDBResourceContainer resourceContainer = new RocksDBResourceContainer( - getConfiguredPredefinedOptionsOrDefault(), - optionsFactory); - return resourceContainer.getDbOptions(); - } - - /** -* Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. Only for testing. -*/ - @VisibleForTesting - public ColumnFamilyOptions getColumnOptions() { - RocksDBResourceContainer resourceContainer = new RocksDBResourceContainer( - getConfigur
[flink] 06/11: [hotfix][state-backend-rocksdb] Some minor style cleanups in RocksDBResourceContainer
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6955ba77dd11c99cde31c990a4d2c9f8ee57232a Author: Stephan Ewen AuthorDate: Mon Dec 9 18:31:58 2019 +0100 [hotfix][state-backend-rocksdb] Some minor style cleanups in RocksDBResourceContainer - Make visibility consistent package private (was mix of package private and public) - remove unused method - Adjust line spacing between fields - Add some JavaDocs - Make final (not designed for inheritance) - Nullable annotations --- .../streaming/state/RocksDBResourceContainer.java | 21 - 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 6e0f23c..4cb18d8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -25,25 +25,32 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; /** * The container for RocksDB resources, including predefined options, option factory and * shared resource among instances. - * - * This should be the only entrance for {@link RocksDBStateBackend} to get RocksDB options, + * + * This should be the only entrance for {@link RocksDBStateBackend} to get RocksDB options, * and should be properly (and necessarily) closed to prevent resource leak. */ -public class RocksDBResourceContainer implements AutoCloseable { +final class RocksDBResourceContainer implements AutoCloseable { /** The pre-configured option settings. */ private PredefinedOptions predefinedOptions; + /** The options factory to create the RocksDB options. */ + @Nullable private OptionsFactory optionsFactory; - /** The shared resource among RocksDB instances. */ + + /** The shared resource among RocksDB instances. This resource is not part of the 'handlesToClose', +* because the handles to close are closed quietly, whereas for this one, we want exceptions to be reported. */ + @Nullable private OpaqueMemoryResource sharedResources; + /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; public RocksDBResourceContainer() { @@ -71,7 +78,7 @@ public class RocksDBResourceContainer implements AutoCloseable { /** * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */ - public ColumnFamilyOptions getColumnOptions() { + ColumnFamilyOptions getColumnOptions() { // initial options from pre-defined profile ColumnFamilyOptions opt = checkAndGetPredefinedOptions().createColumnOptions(handlesToClose); @@ -83,10 +90,6 @@ public class RocksDBResourceContainer implements AutoCloseable { return opt; } - PredefinedOptions getPredefinedOptions() { - return predefinedOptions; - } - PredefinedOptions checkAndGetPredefinedOptions() { if (predefinedOptions == null) { predefinedOptions = PredefinedOptions.DEFAULT;
[flink] branch master updated (eddad99 -> 07f08fb)
This is an automated email from the ASF dual-hosted git repository. sewen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from eddad99 [FLINK-15008][legal] Update licensing new e05618b [FLINK-14926][state-backend-rocksdb] Ensure that RocksObjects are always closed on backend disposal new 6560929 [hotfix][tests] Remove unnecessary temp directory handling new 08554b9 [hotfix][tests Use assumptions rather than manual checks in RocksDBStateBackendConfigTest to report skipped tests properly new 3820bdd [hotfix][tests] Ensure RocksDB native library is loaded into temp directory new fdc5ed5 [FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options to new type safe config options new 6955ba7 [hotfix][state-backend-rocksdb] Some minor style cleanups in RocksDBResourceContainer new 0602ccf [FLINK-14926][state-backend-rocksdb] (follow-up) Make RocksDBResourceContainer immutable new 93deb1a [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify test to rely on RocksDBResourceContainer for cleanup of native handles new 31dd8c3 [FLINK-14926][state-backend-rocksdb] (follow-up) Avoid exposing handles that will not be closed from RocksDBStateBackend new 2405111 [FLINK-14926][state-backend-rocksdb] (follow-up) Simplify collecton the option objects to close new 07f08fb [FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for RocksDBOptionsFactory The 11 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../state/ConfigurableOptionsFactory.java | 2 +- ...java => ConfigurableRocksDBOptionsFactory.java} | 4 +- .../state/DefaultConfigurableOptionsFactory.java | 105 +++- .../contrib/streaming/state/OptionsFactory.java| 21 +--- .../contrib/streaming/state/PredefinedOptions.java | 48 ++-- .../state/RocksDBConfigurableOptions.java | 40 -- .../streaming/state/RocksDBKeyedStateBackend.java | 11 +- .../state/RocksDBKeyedStateBackendBuilder.java | 23 ++-- ...ionsFactory.java => RocksDBOptionsFactory.java} | 49 ++-- .../state/RocksDBOptionsFactoryAdapter.java| 75 .../streaming/state/RocksDBResourceContainer.java | 135 + .../streaming/state/RocksDBStateBackend.java | 130 ++-- .../RocksDBOptionsFactoryCompatibilityTest.java| 88 ++ .../contrib/streaming/state/RocksDBResource.java | 26 ++-- .../state/RocksDBResourceContainerTest.java| 132 .../state/RocksDBStateBackendConfigTest.java | 134 +--- .../streaming/state/RocksDBStateBackendTest.java | 27 ++--- .../contrib/streaming/state/RocksDBTestUtils.java | 13 +- 18 files changed, 757 insertions(+), 306 deletions(-) copy flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/{ConfigurableOptionsFactory.java => ConfigurableRocksDBOptionsFactory.java} (91%) copy flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/{OptionsFactory.java => RocksDBOptionsFactory.java} (62%) create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
[flink] 02/11: [hotfix][tests] Remove unnecessary temp directory handling
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6560929dba26c1a725bb7f8e8f35e3b5dafef42f Author: Stephan Ewen AuthorDate: Mon Dec 9 20:18:17 2019 +0100 [hotfix][tests] Remove unnecessary temp directory handling Remove the use of a non-needed temp directory and relies on JUnit tool rather than manual cleanup. --- .../contrib/streaming/state/RocksDBStateBackendTest.java | 12 ++-- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index be99c51..e59f6e1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -114,16 +114,13 @@ public class RocksDBStateBackendTest extends StateBackendTestBase handlesToClose = new ArrayList<>(); public void prepareRocksDB() throws Exception { - instanceBasePath = tempFolder.newFolder(); - instanceBasePath.mkdirs(); - String dbPath = new File(instanceBasePath, DB_INSTANCE_DIR_STRING).getAbsolutePath(); + String dbPath = new File(tempFolder.newFolder(), DB_INSTANCE_DIR_STRING).getAbsolutePath(); columnOptions = PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose); optionsContainer = new RocksDBResourceContainer(); ArrayList columnFamilyHandles = new ArrayList<>(1); @@ -170,11 +167,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase
[flink] 05/11: [FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options to new type safe config options
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fdc5ed546360fe438f81c564265f8d623f88f02e Author: Stephan Ewen AuthorDate: Tue Dec 10 12:19:59 2019 +0100 [FLINK-15177][state-backend-rocksdb] Migrate RocksDB Configurable Options to new type safe config options This also simplifies the validation logic. --- .../state/DefaultConfigurableOptionsFactory.java | 94 +- .../state/RocksDBConfigurableOptions.java | 36 ++--- .../state/RocksDBStateBackendConfigTest.java | 24 +++--- 3 files changed, 75 insertions(+), 79 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java index 12c0eb9..93753b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java @@ -32,12 +32,11 @@ import org.rocksdb.TableFormatConfig; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_CACHE_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.BLOCK_SIZE; @@ -57,6 +56,8 @@ import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption */ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFactory { + private static final long serialVersionUID = 1L; + private final Map configuredOptions = new HashMap<>(); @Override @@ -131,7 +132,7 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac return new HashMap<>(configuredOptions); } - private boolean isOptionConfigured(ConfigOption configOption) { + private boolean isOptionConfigured(ConfigOption configOption) { return configuredOptions.containsKey(configOption.key()); } @@ -300,44 +301,37 @@ public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFac return this; } - private static final String[] CANDIDATE_CONFIGS = new String[]{ + private static final ConfigOption[] CANDIDATE_CONFIGS = new ConfigOption[] { // configurable DBOptions - MAX_BACKGROUND_THREADS.key(), - MAX_OPEN_FILES.key(), + MAX_BACKGROUND_THREADS, + MAX_OPEN_FILES, // configurable ColumnFamilyOptions - COMPACTION_STYLE.key(), - USE_DYNAMIC_LEVEL_SIZE.key(), - TARGET_FILE_SIZE_BASE.key(), - MAX_SIZE_LEVEL_BASE.key(), - WRITE_BUFFER_SIZE.key(), - MAX_WRITE_BUFFER_NUMBER.key(), - MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), - BLOCK_SIZE.key(), - BLOCK_CACHE_SIZE.key() + COMPACTION_STYLE, + USE_DYNAMIC_LEVEL_SIZE, + TARGET_FILE_SIZE_BASE, + MAX_SIZE_LEVEL_BASE, + WRITE_BUFFER_SIZE, + MAX_WRITE_BUFFER_NUMBER, + MIN_WRITE_BUFFER_NUMBER_TO_MERGE, + BLOCK_SIZE, + BLOCK_CACHE_SIZE }; - private static final Set POSITIVE_INT_CONFIG_SET = new HashSet<>(Arrays.asList( - MAX_BACKGROUND_THREADS.key(), - MAX_WRITE_BUFFER_NUMBER.key(), - MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key() - )); - - private static final Set SIZE_CONFIG_SET = new HashSet<>(Arrays.asList( - TARGET_FILE_SIZE_BASE.key(), - MAX_SIZE_LEVEL_BASE.key(), - WRITE_BUFFER_SIZE.key(), - BLOCK_SIZE.key(), - BLOCK_CACHE_SIZE.key() + private static final Set> POSITIVE_INT_CONFIG_SET = new HashSet<>(Arrays.asList( + MAX_BACKGROUND_THREADS, + MAX_WRITE_BUFFER_NUMBER, + MIN_WRITE_BUFFER_NUMBER_TO_MERGE )); - private static final Set BOOLEAN_CONFIG_SET = new HashSet<>(Collections.singletonList( - USE_DYNAMIC_LEVEL_SIZE.key() + private static final Set> SIZE_CONFIG_SET = new HashSet<>(Arrays.asList( + TARGET_FILE_SIZE_BASE, + MAX_SIZE_LEVEL_BASE, + WRITE_BUFFER_SIZE, +
[flink-web] branch asf-site updated (20e5a8b -> b69c6fe)
This is an automated email from the ASF dual-hosted git repository. fhueske pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 20e5a8b Rebuild website new d23026a [FLINK-14213] Replace link to "Local Setup Tutorial" by link to "Getting Started Overview". new b69c6fe Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: _data/i18n.yml | 6 -- _includes/navbar.html | 4 ++-- content/2019/05/03/pulsar-flink.html| 4 ++-- content/2019/05/14/temporal-tables.html | 4 ++-- content/2019/05/19/state-ttl.html | 4 ++-- content/2019/06/05/flink-network-stack.html | 4 ++-- content/2019/06/26/broadcast-state.html | 4 ++-- content/2019/07/23/flink-network-stack-2.html | 4 ++-- content/blog/index.html | 4 ++-- content/blog/page10/index.html | 4 ++-- content/blog/page2/index.html | 4 ++-- content/blog/page3/index.html | 4 ++-- content/blog/page4/index.html | 4 ++-- content/blog/page5/index.html | 4 ++-- content/blog/page6/index.html | 4 ++-- content/blog/page7/index.html | 4 ++-- content/blog/page8/index.html | 4 ++-- content/blog/page9/index.html | 4 ++-- content/blog/release_1.0.0-changelog_known_issues.html | 4 ++-- content/blog/release_1.1.0-changelog.html | 4 ++-- content/blog/release_1.2.0-changelog.html | 4 ++-- content/blog/release_1.3.0-changelog.html | 4 ++-- content/community.html | 4 ++-- content/contributing/code-style-and-quality-common.html | 4 ++-- content/contributing/code-style-and-quality-components.html | 4 ++-- content/contributing/code-style-and-quality-formatting.html | 4 ++-- content/contributing/code-style-and-quality-java.html | 4 ++-- content/contributing/code-style-and-quality-preamble.html | 4 ++-- content/contributing/code-style-and-quality-pull-requests.html | 4 ++-- content/contributing/code-style-and-quality-scala.html | 4 ++-- content/contributing/contribute-code.html | 4 ++-- content/contributing/contribute-documentation.html | 4 ++-- content/contributing/how-to-contribute.html | 4 ++-- content/contributing/improve-website.html | 4 ++-- content/contributing/reviewing-prs.html | 4 ++-- content/documentation.html | 4 ++-- content/downloads.html | 4 ++-- content/ecosystem.html | 4 ++-- content/faq.html| 4 ++-- content/feature/2019/09/13/state-processor-api.html | 4 ++-- content/features/2017/07/04/flink-rescalable-state.html | 4 ++-- content/features/2018/01/30/incremental-checkpointing.html | 4 ++-- .../features/2018/03/01/end-to-end-exactly-once-apache-flink.html | 4 ++-- content/features/2019/03/11/prometheus-monitoring.html | 4 ++-- content/flink-applications.html | 4 ++-- content/flink-architecture.html | 4 ++-- content/flink-operations.html | 4 ++-- content/gettinghelp.html| 4 ++-- content/index.html | 4 ++-- content/material.html | 4 ++-- content/news/2014/08/26/release-0.6.html| 4 ++-- content/news/2014/09/26/release-0.6.1.html | 4 ++-- content/news/2014/10/03/upcoming_events.html| 4 ++-- content/news/2014/11/04/release-0.7.0.html | 4 ++-- content/news/2014/11/18/hadoop-compatibility.html | 4 ++-- content/news/2015/01/06/december-in-flink.html | 4 ++-- content/news/2015
[flink-web] 01/02: [FLINK-14213] Replace link to "Local Setup Tutorial" by link to "Getting Started Overview".
This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit d23026ac16470157ad8b50e1d753f1471dae593e Author: Fabian Hueske AuthorDate: Wed Sep 25 15:19:40 2019 +0200 [FLINK-14213] Replace link to "Local Setup Tutorial" by link to "Getting Started Overview". This closes #272. --- _data/i18n.yml| 6 -- _includes/navbar.html | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/_data/i18n.yml b/_data/i18n.yml index 9db8599..6c43ccf 100644 --- a/_data/i18n.yml +++ b/_data/i18n.yml @@ -7,7 +7,7 @@ en: powered_by: Powered By faq: FAQ downloads: Downloads -tutorials: Tutorials +getting_started: Getting Started documentation: Documentation getting_help: Getting Help ecosystem: Ecosystem @@ -20,6 +20,7 @@ en: contribute_docs: Contribute Documentation contribute_website: Contribute to the Website roadmap: Roadmap +tutorials: Tutorials zh: what_is_flink: Apache Flink 是什么? @@ -30,7 +31,7 @@ zh: powered_by: Flink 用户 faq: 常见问题 downloads: 下载 -tutorials: 教程 +getting_started: 教程 documentation: 文档 getting_help: 获取帮助 ecosystem: Ecosystem @@ -43,3 +44,4 @@ zh: contribute_docs: 贡献文档 contribute_website: 贡献网站 roadmap: 开发计划 +tutorials: 教程 \ No newline at end of file diff --git a/_includes/navbar.html b/_includes/navbar.html index 71539bd..3acf26e 100755 --- a/_includes/navbar.html +++ b/_includes/navbar.html @@ -63,9 +63,9 @@ {{ site.data.i18n[page.language].downloads }} - + - {{ site.data.i18n[page.language].tutorials }} + {{ site.data.i18n[page.language].getting_started }}
[flink] branch release-1.10 updated: [FLINK-15008][build] Bundle javax.activation-api for Java 11
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 97899cf [FLINK-15008][build] Bundle javax.activation-api for Java 11 97899cf is described below commit 97899cf0a76e163820e1d32f9ba715e742315012 Author: Chesnay Schepler AuthorDate: Fri Dec 6 17:53:50 2019 +0100 [FLINK-15008][build] Bundle javax.activation-api for Java 11 --- flink-dist/pom.xml | 30 - flink-dist/src/main/resources/META-INF/NOTICE | 3 +- .../META-INF/licenses/LICENSE.javax.activation | 135 + .../src/main/resources/META-INF/NOTICE | 4 +- pom.xml| 12 +- 5 files changed, 175 insertions(+), 9 deletions(-) diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 11255cb..2442f6f 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -398,6 +398,16 @@ under the License. provided + + + javax.activation + javax.activation-api + 1.2.0 + + + provided + + @@ -580,7 +590,7 @@ under the License. maven-dependency-plugin - copy-jaxb-jar + copy-javax-jars process-resources copy @@ -596,6 +606,13 @@ under the License. jar true + + javax.activation + javax.activation-api + ${javax.activation.api.version} + jar + true + ${project.build.directory}/temporary @@ -606,16 +623,19 @@ under the License. maven-antrun-plugin - unpack-jaxb-library + unpack-javax-libraries process-resources run - - + + + + + + diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE index 2714eda..be9dd91 100644 --- a/flink-dist/src/main/resources/META-INF/NOTICE +++ b/flink-dist/src/main/resources/META-INF/NOTICE @@ -53,7 +53,8 @@ See bundled license files for details. This project bundles the following dependencies under the CDDL 1.1 license. See bundled license files for details. -- javax.xml.bind:jaxb-api:2.3.0 +- javax.activation:javax.activation-api:1.2.0 +- javax.xml.bind:jaxb-api:2.3.1 This project bundles "org.tukaani:xz:1.5". This Java implementation of XZ has been put into the public domain, thus you can do diff --git a/flink-dist/src/main/resources/META-INF/licenses/LICENSE.javax.activation b/flink-dist/src/main/resources/META-INF/licenses/LICENSE.javax.activation new file mode 100644 index 000..fd16ea9 --- /dev/null +++ b/flink-dist/src/main/resource
[flink] branch master updated (2c11291 -> 886bf6f)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2c11291 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests add 886bf6f [FLINK-15008][build] Bundle javax.activation-api for Java 11 No new revisions were added by this update. Summary of changes: flink-dist/pom.xml | 30 ++ .../{LICENSE.jaxb => LICENSE.javax.activation} | 0 pom.xml| 12 - 3 files changed, 36 insertions(+), 6 deletions(-) copy flink-dist/src/main/resources/META-INF/licenses/{LICENSE.jaxb => LICENSE.javax.activation} (100%)
[flink] branch release-1.8 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new dfcbf68 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests dfcbf68 is described below commit dfcbf68dbd792fc27c997078be7a59a594005d8b Author: Yangze Guo AuthorDate: Tue Dec 10 10:03:49 2019 +0800 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests --- .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++ .../streaming/tests/TtlVerifyUpdateFunction.java | 21 + 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java index 24bc9dc..c2e66f1 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java @@ -19,12 +19,15 @@ package org.apache.flink.streaming.tests; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkState; + /** * A stub implementation of a {@link TtlTimeProvider} which guarantees that * processing time increases monotonically. @@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { private static final Object lock = new Object(); @GuardedBy("lock") - static long freeze() { + static T doWithFrozenTime(FunctionWithException action) throws E { synchronized (lock) { - if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { - timeIsFrozen = true; - return getCurrentTimestamp(); - } else { - return lastReturnedProcessingTime; - } + final long timestampBeforeUpdate = freeze(); + T result = action.apply(timestampBeforeUpdate); + final long timestampAfterUpdate = unfreezeTime(); + + checkState(timestampAfterUpdate == timestampBeforeUpdate, + "Timestamps before and after the update do not match."); + return result; + } + } + + private static long freeze() { + if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { + timeIsFrozen = true; + return getCurrentTimestamp(); + } else { + return lastReturnedProcessingTime; } } @@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { return lastReturnedProcessingTime; } - @GuardedBy("lock") - static long unfreezeTime() { - synchronized (lock) { - timeIsFrozen = false; - return lastReturnedProcessingTime; - } + private static long unfreezeTime() { + timeIsFrozen = false; + return lastReturnedProcessingTime; } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index 94e9dbd..ed69171 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -47,7 +47,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Update state with TTL for each verifier. @@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction verifier, Object update) throws Exception { - final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze(); - State state = states.get(verifier.getId(
[flink] branch release-1.9 updated: [FLINK-14951][tests] Harden the thread safety of State TTL backend tests
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new 8a73d68 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests 8a73d68 is described below commit 8a73d680869da0d7bb4d543bfc197d01f3b0e068 Author: Yangze Guo AuthorDate: Tue Dec 10 10:03:49 2019 +0800 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests --- .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++ .../streaming/tests/TtlVerifyUpdateFunction.java | 21 + 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java index 24bc9dc..c2e66f1 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java @@ -19,12 +19,15 @@ package org.apache.flink.streaming.tests; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkState; + /** * A stub implementation of a {@link TtlTimeProvider} which guarantees that * processing time increases monotonically. @@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { private static final Object lock = new Object(); @GuardedBy("lock") - static long freeze() { + static T doWithFrozenTime(FunctionWithException action) throws E { synchronized (lock) { - if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { - timeIsFrozen = true; - return getCurrentTimestamp(); - } else { - return lastReturnedProcessingTime; - } + final long timestampBeforeUpdate = freeze(); + T result = action.apply(timestampBeforeUpdate); + final long timestampAfterUpdate = unfreezeTime(); + + checkState(timestampAfterUpdate == timestampBeforeUpdate, + "Timestamps before and after the update do not match."); + return result; + } + } + + private static long freeze() { + if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) { + timeIsFrozen = true; + return getCurrentTimestamp(); + } else { + return lastReturnedProcessingTime; } } @@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { return lastReturnedProcessingTime; } - @GuardedBy("lock") - static long unfreezeTime() { - synchronized (lock) { - timeIsFrozen = false; - return lastReturnedProcessingTime; - } + private static long unfreezeTime() { + timeIsFrozen = false; + return lastReturnedProcessingTime; } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index 94e9dbd..ed69171 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -47,7 +47,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Update state with TTL for each verifier. @@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction verifier, Object update) throws Exception { - final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze(); - State state = states.get(verifier.getId(
[flink] branch master updated (5c89d12 -> 2c11291)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5c89d12 [hotfix][doc] update Hive functions page with more info add 2c11291 [FLINK-14951][tests] Harden the thread safety of State TTL backend tests No new revisions were added by this update. Summary of changes: .../streaming/tests/MonotonicTTLTimeProvider.java | 36 ++ .../streaming/tests/TtlVerifyUpdateFunction.java | 21 + 2 files changed, 30 insertions(+), 27 deletions(-)
[flink-web] branch asf-site updated (6c7c3f1 -> 20e5a8b)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 6c7c3f1 Rebuild page new ff40e10 Add Vip to powered by page new 20e5a8b Rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: content/img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes content/index.html | 6 ++ content/poweredby.html | 4 content/zh/index.html | 6 ++ content/zh/poweredby.html | 4 img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes index.md | 6 ++ index.zh.md| 6 ++ poweredby.md | 4 poweredby.zh.md| 4 10 files changed, 40 insertions(+) create mode 100644 content/img/poweredby/vip-logo.png create mode 100644 img/poweredby/vip-logo.png
[flink-web] 01/02: Add Vip to powered by page
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit ff40e108cc801d75cec71a143fe7638482f329f6 Author: bowen.li AuthorDate: Mon Dec 2 12:19:07 2019 -0800 Add Vip to powered by page This closes #286 --- img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes index.md | 6 ++ index.zh.md| 6 ++ poweredby.md | 4 poweredby.zh.md| 4 5 files changed, 20 insertions(+) diff --git a/img/poweredby/vip-logo.png b/img/poweredby/vip-logo.png new file mode 100644 index 000..532d0bb Binary files /dev/null and b/img/poweredby/vip-logo.png differ diff --git a/index.md b/index.md index 1239234..8698fb2 100644 --- a/index.md +++ b/index.md @@ -308,6 +308,12 @@ layout: base + + + + + + diff --git a/index.zh.md b/index.zh.md index 36a7131..22e809b 100644 --- a/index.zh.md +++ b/index.zh.md @@ -295,6 +295,12 @@ layout: base + + + + + + diff --git a/poweredby.md b/poweredby.md index ad606c0..b2680b2 100644 --- a/poweredby.md +++ b/poweredby.md @@ -134,6 +134,10 @@ If you would you like to be included on this page, please reach out to the [Flin Uber built their internal SQL-based, open-source streaming analytics platform AthenaX on Apache Flink. https://eng.uber.com/athenax/"; target='_blank'> Read more on the Uber engineering blog + + Vip, one of the largest warehouse sale website for big brands in China, uses Flink to stream and ETL data into Apache Hive in real-time for data processing and analytics. https://yq.aliyun.com/articles/652548"; target='_blank'> Read more about Vip's story. + + Xiaomi, one of the largest electronics companies in China, built a platform with Flink to improve the efficiency of developing and operating real-time applications and use it in real-time recommendations. https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; target='_blank'> Learn more about how Xiaomi is using Flink. diff --git a/poweredby.zh.md b/poweredby.zh.md index 1ca3741..9812094 100644 --- a/poweredby.zh.md +++ b/poweredby.zh.md @@ -127,6 +127,10 @@ Apache Flink 为全球许多公司和企业的关键业务提供支持。在这 Uber 在 Apache Flink 上构建了基于 SQL 的开源流媒体分析平台 AthenaX。https://eng.uber.com/athenax/"; target='_blank'> 更多信息请访问Uber工程博客 + + Vip,中国最大的品牌特卖网站之一,应用Flink实时的将数据流ETL到Hive中用于数据处理和分析. https://yq.aliyun.com/articles/652548"; target='_blank'> 详细了解Vip如何使用 Flink + + 小米,作为中国最大的专注于硬件与软件开发的公司之一,利用 Flink 构建了一个内部平台,以提高开发运维实时应用程序的效率,并用于实时推荐等场景。https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; target='_blank'> 详细了解小米如何使用 Flink 的。
[flink-web] 02/02: Rebuild website
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 20e5a8b7d41c51edf71c0177812e5874580cbd06 Author: Jark Wu AuthorDate: Wed Dec 11 16:25:15 2019 +0800 Rebuild website --- content/img/poweredby/vip-logo.png | Bin 0 -> 21680 bytes content/index.html | 6 ++ content/poweredby.html | 4 content/zh/index.html | 6 ++ content/zh/poweredby.html | 4 5 files changed, 20 insertions(+) diff --git a/content/img/poweredby/vip-logo.png b/content/img/poweredby/vip-logo.png new file mode 100644 index 000..532d0bb Binary files /dev/null and b/content/img/poweredby/vip-logo.png differ diff --git a/content/index.html b/content/index.html index 8a07d08..e846c4d 100644 --- a/content/index.html +++ b/content/index.html @@ -482,6 +482,12 @@ + + + + + + diff --git a/content/poweredby.html b/content/poweredby.html index 253a21f..5e6a1ad 100644 --- a/content/poweredby.html +++ b/content/poweredby.html @@ -312,6 +312,10 @@ Uber built their internal SQL-based, open-source streaming analytics platform AthenaX on Apache Flink. https://eng.uber.com/athenax/"; target="_blank"> Read more on the Uber engineering blog + + Vip, one of the largest warehouse sale website for big brands in China, uses Flink to stream and ETL data into Apache Hive in real-time for data processing and analytics. https://yq.aliyun.com/articles/652548"; target="_blank"> Read more about Vip's story. + + Xiaomi, one of the largest electronics companies in China, built a platform with Flink to improve the efficiency of developing and operating real-time applications and use it in real-time recommendations. https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; target="_blank"> Learn more about how Xiaomi is using Flink. diff --git a/content/zh/index.html b/content/zh/index.html index d1c0b2c..758724a 100644 --- a/content/zh/index.html +++ b/content/zh/index.html @@ -467,6 +467,12 @@ + + + + + + diff --git a/content/zh/poweredby.html b/content/zh/poweredby.html index c38dbf4..f98df9e 100644 --- a/content/zh/poweredby.html +++ b/content/zh/poweredby.html @@ -303,6 +303,10 @@ Uber 在 Apache Flink 上构建了基于 SQL 的开源流媒体分析平台 AthenaX。https://eng.uber.com/athenax/"; target="_blank"> 更多信息请访问Uber工程博客 + + Vip,中国最大的品牌特卖网站之一,应用Flink实时的将数据流ETL到Hive中用于数据处理和分析. https://yq.aliyun.com/articles/652548"; target="_blank"> 详细了解Vip如何使用 Flink + + 小米,作为中国最大的专注于硬件与软件开发的公司之一,利用 Flink 构建了一个内部平台,以提高开发运维实时应用程序的效率,并用于实时推荐等场景。https://files.alicdn.com/tpsservice/d77d3ed3f2709790f0d84f4ec279a486.pdf"; target="_blank"> 详细了解小米如何使用 Flink 的。