[flink] branch master updated (ccb29e9 -> d33fb62)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ccb29e9 [FLINK-16804] Deprecate all Table methods that accept expressions as a String add d33fb62 [FLINK-17385][jdbc][postgres] Handled problem of numeric with 0 precision No new revisions were added by this update. Summary of changes: .../api/java/io/jdbc/catalog/PostgresCatalog.java | 18 +++--- .../java/io/jdbc/catalog/PostgresCatalogITCase.java| 3 ++- .../java/io/jdbc/catalog/PostgresCatalogTestBase.java | 11 +-- 3 files changed, 26 insertions(+), 6 deletions(-)
[flink] branch master updated: [FLINK-17333][doc] add doc for 'create catalog' ddl
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f6f2fb5 [FLINK-17333][doc] add doc for 'create catalog' ddl f6f2fb5 is described below commit f6f2fb56c3ee1d801f0db62fed357f918d40d4d0 Author: bowen.li AuthorDate: Wed Apr 22 22:05:54 2020 -0700 [FLINK-17333][doc] add doc for 'create catalog' ddl closes #11871 --- docs/dev/table/sql/create.md| 18 ++ docs/dev/table/sql/create.zh.md | 18 ++ 2 files changed, 36 insertions(+) diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md index 6e41eac..e44e2bc 100644 --- a/docs/dev/table/sql/create.md +++ b/docs/dev/table/sql/create.md @@ -210,6 +210,24 @@ The key and value of expression `key1=val1` should both be string literal. See d {% top %} +## CREATE CATALOG + +{% highlight sql %} +CREATE CATALOG catalog_name + WITH (key1=val1, key2=val2, ...) +{% endhighlight %} + +Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown. + +**WITH OPTIONS** + +Catalog properties used to store extra information related to this catalog. +The key and value of expression `key1=val1` should both be string literal. + +Check out more details at [Catalogs]({{ site.baseurl }}/dev/table/catalogs.html). + +{% top %} + ## CREATE DATABASE {% highlight sql %} diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md index a696912..5c6099a 100644 --- a/docs/dev/table/sql/create.zh.md +++ b/docs/dev/table/sql/create.zh.md @@ -210,6 +210,24 @@ CREATE TABLE Orders ( {% top %} +## CREATE CATALOG + +{% highlight sql %} +CREATE CATALOG catalog_name + WITH (key1=val1, key2=val2, ...) +{% endhighlight %} + +Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown. + +**WITH OPTIONS** + +Catalog properties used to store extra information related to this catalog. +The key and value of expression `key1=val1` should both be string literal. + +Check out more details at [Catalogs]({{ site.baseurl }}/dev/table/catalogs.html). + +{% top %} + ## CREATE DATABASE {% highlight sql %}
[flink] branch master updated: [FLINK-16812][jdbc] support array types in PostgresRowConverter
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4121292 [FLINK-16812][jdbc] support array types in PostgresRowConverter 4121292 is described below commit 4121292fbb63dacef29245a2234da68fa499efa6 Author: bowen.li AuthorDate: Wed Apr 15 22:26:22 2020 -0700 [FLINK-16812][jdbc] support array types in PostgresRowConverter closes #11766 --- .../row/converter/AbstractJDBCRowConverter.java| 5 +- .../source/row/converter/JDBCRowConverter.java | 2 +- .../source/row/converter/PostgresRowConverter.java | 43 +++ .../io/jdbc/catalog/PostgresCatalogITCase.java | 26 + .../java/io/jdbc/catalog/PostgresCatalogTest.java | 9 +++- .../io/jdbc/catalog/PostgresCatalogTestBase.java | 62 ++ 6 files changed, 144 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java index 06a6329..abe753f 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java @@ -54,7 +54,10 @@ public abstract class AbstractJDBCRowConverter implements JDBCRowConverter { return reuse; } - private JDBCFieldConverter createConverter(LogicalType type) { + /** +* Create a runtime JDBC field converter from given {@link LogicalType}. +*/ + public JDBCFieldConverter createConverter(LogicalType type) { LogicalTypeRoot root = type.getTypeRoot(); if (root == LogicalTypeRoot.SMALLINT) { diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java index 3f997b4..e89fbc3 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java @@ -43,6 +43,6 @@ public interface JDBCRowConverter extends Serializable { */ @FunctionalInterface interface JDBCFieldConverter extends Serializable { - Object convert(Object value); + Object convert(Object value) throws SQLException; } } diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java index 102f079..82b6be8 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java @@ -18,7 +18,15 @@ package org.apache.flink.api.java.io.jdbc.source.row.converter; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; + +import org.postgresql.jdbc.PgArray; +import org.postgresql.util.PGobject; /** * Row converter for Postgres. @@ -28,4 +36,39 @@ public class PostgresRowConverter extends AbstractJDBCRowConverter { public PostgresRowConverter(RowType rowType) { super(rowType); } + + @Override + public JDBCFieldConverter createConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) type; + + // PG's bytea[] is wrapped in PGobject, rather than primitive byte arrays + if (LogicalTypeChecks.hasFamily(arrayType.getElementType(), LogicalTypeFamily.BINARY_STRING)) { + + return v -> { + PgArray pgArray = (PgArray) v; + Object[] in = (Object[]) pgArray.getArray(); + + Object[] out = ne
[flink] branch master updated (c4b44e9 -> 6128bd1)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c4b44e9 [FLINK-13639] Refactor the IntermediateResultPartitionID to consist of IntermediateDataSetID and partitionIndex add 6128bd1 [FLINK-17175][core] StringUtils.arrayToString() should consider Object[] lastly No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/util/StringUtils.java | 6 +++--- .../src/test/java/org/apache/flink/util/StringUtilsTest.java | 11 +++ 2 files changed, 14 insertions(+), 3 deletions(-)
[flink] branch master updated: [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new bf05c90 [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog bf05c90 is described below commit bf05c90f081938caccd764b3b7294b0356728122 Author: bowen.li AuthorDate: Sat Apr 18 14:53:49 2020 -0700 [FLINK-16473][doc][jdbc] add documentation for JDBCCatalog and PostgresCatalog closes #11804 --- docs/dev/table/catalogs.md| 95 +++ docs/dev/table/catalogs.zh.md | 91 + docs/dev/table/connect.md | 4 ++ docs/dev/table/connect.zh.md | 4 ++ 4 files changed, 194 insertions(+) diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md index 862ae3a..a20c617 100644 --- a/docs/dev/table/catalogs.md +++ b/docs/dev/table/catalogs.md @@ -28,6 +28,10 @@ One of the most crucial aspects of data processing is managing metadata. It may be transient metadata like temporary tables, or UDFs registered against the table environment. Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries. +Catalog enables users to reference existing metadata in their data systems, and automatically maps them to Flink's corresponding metadata. +For example, Flink can map JDBC tables to Flink table automatically, and users don't have to manually re-writing DDLs in Flink. +Catalog greatly simplifies steps required to get started with Flink with users' existing system, and greatly enhanced user experiences. + * This will be replaced by the TOC {:toc} @@ -37,6 +41,97 @@ Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified The `GenericInMemoryCatalog` is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session. +### JDBCCatalog + +The `JDBCCatalog` enables users to connect Flink to relational databases over JDBC protocol. + + PostgresCatalog + +`PostgresCatalog` is the only implementation of JDBC Catalog at the moment. + + Usage of JDBCCatalog + +Set a `JDBCatalog` with the following parameters: + +- name: required, name of the catalog +- default database: required, default database to connect to +- username: required, username of Postgres account +- password: required, password of the account +- base url: required, should be of format "jdbc:postgresql://:", and should not contain database name here + + + +{% highlight java %} + +EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); +TableEnvironment tableEnv = TableEnvironment.create(settings); + +String name= "mypg"; +String defaultDatabase = "mydb"; +String username= "..."; +String password= "..."; +String baseUrl = "..." + +JDBCCatalog catalog = new JDBCCatalog(name, defaultDatabase, username, password, baseUrl); +tableEnv.registerCatalog("mypg", catalog); + +// set the JDBCCatalog as the current catalog of the session +tableEnv.useCatalog("mypg"); +{% endhighlight %} + + +{% highlight scala %} + +val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() +val tableEnv = TableEnvironment.create(settings) + +val name= "mypg" +val defaultDatabase = "mydb" +val username= "..." +val password= "..." +val baseUrl = "..." + +val catalog = new JDBCCatalog(name, defaultDatabase, username, password, baseUrl) +tableEnv.registerCatalog("mypg", catalog) + +// set the JDBCCatalog as the current catalog of the session +tableEnv.useCatalog("mypg") +{% endhighlight %} + + +{% highlight sql %} +CREATE CATALOG mypg WITH( +'type'='jdbc', +'default-database'='...', +'username'='...', +'password'='...', +'base-url'='...' +); + +USE CATALOG mypg; +{% endhighlight %} + + +{% highlight yaml %} + +execution: +planner: blink +... +current-catalog: mypg # set the JDBCCatalog as the current catalog of the session +current-database: mydb + +catalogs: + - name: mypg + type: jdbc + default-database: mydb + username: ... + password: ... + base-url: ... +{% endhighlight %} + + + + ### HiveCatalog The `HiveCatalog` serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. diff --git a/docs/dev/table/catalogs.zh.md b/docs/dev/table/catalogs.zh.md index c
[flink] branch master updated (6acd646 -> 978d7e9)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6acd646 [FLINK-17084][table] Implement input type inference and output type strategy for ARRAY/ROW/MAP built-in functions (#11701) add 3bfeea1 [FLINK-16813][jdbc] JDBCInputFormat doesn't correctly map Short add aa9bcc1 [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource add 978d7e9 [FLINK-16815][jdbc] add e2e tests for reading primitive data types from postgres with JDBCTableSource and PostgresCatalog No new revisions were added by this update. Summary of changes: .../flink/api/java/io/jdbc/JdbcTypeUtil.java | 18 + .../api/java/io/jdbc/dialect/JDBCDialects.java | 1 - .../row/converter/AbstractJDBCRowConverter.java| 23 +- .../source/row/converter/JDBCRowConverter.java | 9 +++ .../io/jdbc/catalog/PostgresCatalogITCase.java | 10 +++ .../java/io/jdbc/catalog/PostgresCatalogTest.java | 17 +++-- .../io/jdbc/catalog/PostgresCatalogTestBase.java | 86 ++ 7 files changed, 92 insertions(+), 72 deletions(-)
[flink] branch master updated (989bc02 -> 3fd568a)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 989bc02 [FLINK-16817][core] StringUtils.arrayToString() doesn't convert array of byte array correctly add 3fd568a [FLINK-16811][jdbc] introduce row converter API to JDBCDialect No new revisions were added by this update. Summary of changes: .../flink/api/java/io/jdbc/JDBCInputFormat.java| 18 +++--- .../flink/api/java/io/jdbc/JDBCTableSource.java| 2 ++ .../api/java/io/jdbc/dialect/JDBCDialect.java | 9 + .../api/java/io/jdbc/dialect/JDBCDialects.java | 20 +++ .../row/converter/AbstractJDBCRowConverter.java| 39 ++ .../source/row/converter/DerbyRowConverter.java| 12 --- .../source/row/converter/JDBCRowConverter.java}| 17 ++ .../source/row/converter/MySQLRowConverter.java| 12 --- .../source/row/converter/PostgresRowConverter.java | 12 --- .../flink/api/java/io/jdbc/JDBCFullTest.java | 5 ++- .../api/java/io/jdbc/JDBCInputFormatTest.java | 22 .../flink/api/java/io/jdbc/JdbcTestFixture.java| 5 +++ 12 files changed, 123 insertions(+), 50 deletions(-) copy flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java => flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/AbstractJDBCRowConverter.java (57%) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/ExpressionParserException.java => flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/DerbyRowConverter.java (75%) copy flink-connectors/{flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java => flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/JDBCRowConverter.java} (64%) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/ExpressionParserException.java => flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/MySQLRowConverter.java (75%) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/ExpressionParserException.java => flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/source/row/converter/PostgresRowConverter.java (74%)
[flink] branch master updated (a9f3e2f -> 989bc02)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a9f3e2f [FLINK-17010] Remove --quiet from s3 get operation add 989bc02 [FLINK-16817][core] StringUtils.arrayToString() doesn't convert array of byte array correctly No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/util/StringUtils.java | 11 +++ 1 file changed, 11 insertions(+)
[flink-web] 01/03: regenerate site
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit fbc6bd5fdc9feb5478a4af304841081e6bb81a17 Author: bowen.li AuthorDate: Fri Mar 27 12:41:40 2020 -0700 regenerate site --- content/blog/feed.xml | 225 +++-- content/blog/index.html| 38 ++- content/blog/page10/index.html | 45 ++- content/blog/page11/index.html | 25 ++ content/blog/page2/index.html | 36 +- content/blog/page3/index.html | 38 ++- content/blog/page4/index.html | 40 ++- content/blog/page5/index.html | 38 ++- content/blog/page6/index.html | 36 +- content/blog/page7/index.html | 37 ++- content/blog/page8/index.html | 38 ++- content/blog/page9/index.html | 44 +-- .../2020/03/26/flink-for-data-warehouse.html | 369 + content/index.html | 8 +- content/zh/index.html | 8 +- 15 files changed, 775 insertions(+), 250 deletions(-) diff --git a/content/blog/feed.xml b/content/blog/feed.xml index 803c8fb..c3bd365 100644 --- a/content/blog/feed.xml +++ b/content/blog/feed.xml @@ -7,6 +7,132 @@ https://flink.apache.org/blog/feed.xml"; rel="self" type="application/rss+xml" /> +Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration +<p>In this blog post, you will learn our motivation behind the Flink-Hive integration, and how Flink 1.10 can help modernize your data warehouse.</p> + +<div class="page-toc"> +<ul id="markdown-toc"> + <li><a href="#introduction" id="markdown-toc-introduction">Introduction</a></li> + <li><a href="#flink-and-its-integration-with-hive-comes-into-the-scene" id="markdown-toc-flink-and-its-integration-with-hive-comes-into-the-scene">Flink and Its Integration With Hive Comes into the Scene</a><ul> + <li><a href="#unified-metadata-management" id="markdown-toc-unified-metadata-management">Unified Metadata Management</a></li> + <li><a href="#stream-processing" id="markdown-toc-stream-processing">Stream Processing</a></li> + <li><a href="#compatible-with-more-hive-versions" id="markdown-toc-compatible-with-more-hive-versions">Compatible with More Hive Versions</a></li> + <li><a href="#reuse-hive-user-defined-functions-udfs" id="markdown-toc-reuse-hive-user-defined-functions-udfs">Reuse Hive User Defined Functions (UDFs)</a></li> + <li><a href="#enhanced-read-and-write-on-hive-data" id="markdown-toc-enhanced-read-and-write-on-hive-data">Enhanced Read and Write on Hive Data</a></li> + <li><a href="#formats" id="markdown-toc-formats">Formats</a></li> + <li><a href="#more-data-types" id="markdown-toc-more-data-types">More Data Types</a></li> + <li><a href="#roadmap" id="markdown-toc-roadmap">Roadmap</a></li> +</ul> + </li> + <li><a href="#summary" id="markdown-toc-summary">Summary</a></li> +</ul> + +</div> + +<h2 id="introduction">Introduction</h2> + +<p>What are some of the latest requirements for your data warehouse and data infrastructure in 2020?</p> + +<p>We’ve came up with some for you.</p> + +<p>Firstly, today’s business is shifting to a more real-time fashion, and thus demands abilities to process online streaming data with low latency for near-real-time or even real-time analytics. People become less and less tolerant of delays between when data is generated and when it arrives at their hands, ready to use. Hours or even days of delay is not acceptable anymore. Users are expecting minutes, or even seconds, of end-to-end latency for data in their warehouse, to get quic [...] + +<p>Secondly, the infrastructure should be able to handle both offline batch data for offline analytics and exploration, and online streaming data for more timely analytics. Both are indispensable as they both have very valid use cases. Apart from the real time processing mentioned above, batch processing would still exist as it’s good for ad hoc queries and explorations, and full-size calculations. Your
[flink-web] 02/03: reset blog post date to 2020/3/27
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 1cc3b728ea308b4f0a7bf0d58860050e85589d7c Author: bowen.li AuthorDate: Fri Mar 27 12:47:21 2020 -0700 reset blog post date to 2020/3/27 --- ...ink-for-data-warehouse.md => 2020-03-27-flink-for-data-warehouse.md} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_posts/2020-02-26-flink-for-data-warehouse.md b/_posts/2020-03-27-flink-for-data-warehouse.md similarity index 99% rename from _posts/2020-02-26-flink-for-data-warehouse.md rename to _posts/2020-03-27-flink-for-data-warehouse.md index d47368f..03d8b8f 100644 --- a/_posts/2020-02-26-flink-for-data-warehouse.md +++ b/_posts/2020-03-27-flink-for-data-warehouse.md @@ -1,7 +1,7 @@ --- layout: post title: "Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration" -date: 2020-03-26T02:30:00.000Z +date: 2020-03-27T02:30:00.000Z categories: features authors: - bowen:
[flink-web] 03/03: regenerate site
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 550badb06ba694751ed5a835f2fdcad9ce8f8282 Author: bowen.li AuthorDate: Fri Mar 27 12:48:53 2020 -0700 regenerate site --- content/blog/page10/index.html| 2 +- content/blog/page11/index.html| 2 +- content/blog/page2/index.html | 2 +- content/blog/page3/index.html | 2 +- content/blog/page4/index.html | 2 +- content/blog/page5/index.html | 2 +- content/blog/page6/index.html | 2 +- content/blog/page7/index.html | 2 +- content/blog/page8/index.html | 2 +- content/blog/page9/index.html | 2 +- content/features/2020/03/{26 => 27}/flink-for-data-warehouse.html | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/content/blog/page10/index.html b/content/blog/page10/index.html index 7bccfc7..b37734e 100644 --- a/content/blog/page10/index.html +++ b/content/blog/page10/index.html @@ -376,7 +376,7 @@ and offers a new API including definition of flexible windows. - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page11/index.html b/content/blog/page11/index.html index f757c49..62d1945 100644 --- a/content/blog/page11/index.html +++ b/content/blog/page11/index.html @@ -264,7 +264,7 @@ academic and open source project that Flink originates from. - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page2/index.html b/content/blog/page2/index.html index b2b9023..3de31b4 100644 --- a/content/blog/page2/index.html +++ b/content/blog/page2/index.html @@ -355,7 +355,7 @@ - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page3/index.html b/content/blog/page3/index.html index fe76cc1..a8131a3 100644 --- a/content/blog/page3/index.html +++ b/content/blog/page3/index.html @@ -356,7 +356,7 @@ for more details. - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page4/index.html b/content/blog/page4/index.html index a20e2a9..b5b5f31 100644 --- a/content/blog/page4/index.html +++ b/content/blog/page4/index.html @@ -366,7 +366,7 @@ Please check the https://issues.apache.org/jira/secure/ReleaseNote.jspa - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page5/index.html b/content/blog/page5/index.html index 7b6a542..d6daf3b 100644 --- a/content/blog/page5/index.html +++ b/content/blog/page5/index.html @@ -362,7 +362,7 @@ - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page6/index.html b/content/blog/page6/index.html index fd2d3d1..9df436b 100644 --- a/content/blog/page6/index.html +++ b/content/blog/page6/index.html @@ -366,7 +366,7 @@ what’s coming in Flink 1.4.0 as well as a preview of what the Flink community - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page7/index.html b/content/blog/page7/index.html index 4141db2..b88aade 100644 --- a/content/blog/page7/index.html +++ b/content/blog/page7/index.html @@ -360,7 +360,7 @@ - Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration + Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration diff --git a/content/blog/page8/index.html b/content/blog/page8/index.html index 55b994d..f91f090 100644 --- a/cont
[flink-web] branch asf-site updated (4d2bf75 -> 550badb)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 4d2bf75 add blog for flink with data warehouse and hive integration new fbc6bd5 regenerate site new 1cc3b72 reset blog post date to 2020/3/27 new 550badb regenerate site The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...e.md => 2020-03-27-flink-for-data-warehouse.md} | 2 +- content/blog/feed.xml | 225 +++-- content/blog/index.html| 38 ++- content/blog/page10/index.html | 45 ++- content/blog/page11/index.html | 25 ++ content/blog/page2/index.html | 36 +- content/blog/page3/index.html | 38 ++- content/blog/page4/index.html | 40 ++- content/blog/page5/index.html | 38 ++- content/blog/page6/index.html | 36 +- content/blog/page7/index.html | 37 ++- content/blog/page8/index.html | 38 ++- content/blog/page9/index.html | 44 +-- .../2020/03/27/flink-for-data-warehouse.html | 369 + content/index.html | 8 +- content/zh/index.html | 8 +- 16 files changed, 776 insertions(+), 251 deletions(-) rename _posts/{2020-02-26-flink-for-data-warehouse.md => 2020-03-27-flink-for-data-warehouse.md} (99%) create mode 100644 content/features/2020/03/27/flink-for-data-warehouse.html
[flink-web] branch asf-site updated: add blog for flink with data warehouse and hive integration
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 4d2bf75 add blog for flink with data warehouse and hive integration 4d2bf75 is described below commit 4d2bf75b43961a6cd0e366a3f619ce54acdb7ac3 Author: bowen.li AuthorDate: Mon Feb 24 09:17:33 2020 -0800 add blog for flink with data warehouse and hive integration --- _posts/2020-02-26-flink-for-data-warehouse.md | 117 ++ 1 file changed, 117 insertions(+) diff --git a/_posts/2020-02-26-flink-for-data-warehouse.md b/_posts/2020-02-26-flink-for-data-warehouse.md new file mode 100644 index 000..d47368f --- /dev/null +++ b/_posts/2020-02-26-flink-for-data-warehouse.md @@ -0,0 +1,117 @@ +--- +layout: post +title: "Flink as Unified Engine for Modern Data Warehousing: Production-Ready Hive Integration" +date: 2020-03-26T02:30:00.000Z +categories: features +authors: +- bowen: + name: Bowen Li + twitter: Bowen__Li +--- + +In this blog post, you will learn our motivation behind the Flink-Hive integration, and how Flink 1.10 can help modernize your data warehouse. + +{% toc %} + + +## Introduction + +What are some of the latest requirements for your data warehouse and data infrastructure in 2020? + +We’ve came up with some for you. + +Firstly, today’s business is shifting to a more real-time fashion, and thus demands abilities to process online streaming data with low latency for near-real-time or even real-time analytics. People become less and less tolerant of delays between when data is generated and when it arrives at their hands, ready to use. Hours or even days of delay is not acceptable anymore. Users are expecting minutes, or even seconds, of end-to-end latency for data in their warehouse, to get quicker-than- [...] + +Secondly, the infrastructure should be able to handle both offline batch data for offline analytics and exploration, and online streaming data for more timely analytics. Both are indispensable as they both have very valid use cases. Apart from the real time processing mentioned above, batch processing would still exist as it’s good for ad hoc queries and explorations, and full-size calculations. Your modern infrastructure should not force users to choose between one or the other, it shou [...] + +Thirdly, the data players, including data engineers, data scientists, analysts, and operations, urge a more unified infrastructure than ever before for easier ramp-up and higher working efficiency. The big data landscape has been fragmented for years - companies may have one set of infrastructure for real time processing, one set for batch, one set for OLAP, etc. That, oftentimes, comes as a result of the legacy of lambda architecture, which was popular in the era when stream processors [...] + +If any of these resonate with you, you just found the right post to read: we have never been this close to the vision by strengthening Flink’s integration with Hive to a production grade. + + +## Flink and Its Integration With Hive Comes into the Scene + +Apache Flink has been a proven scalable system to handle extremely high workload of streaming data in super low latency in many giant tech companies. + +Despite its huge success in the real time processing domain, at its deep root, Flink has been faithfully following its inborn philosophy of being [a unified data processing engine for both batch and streaming](https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html), and taking a streaming-first approach in its architecture to do batch processing. By making batch a special case for streaming, Flink really leverages its cutting edge streaming capabilities and applies t [...] + +On the other hand, Apache Hive has established itself as a focal point of the data warehousing ecosystem. It serves as not only a SQL engine for big data analytics and ETL, but also a data management platform, where data is discovered and defined. As business evolves, it puts new requirements on data warehouse. + +Thus we started integrating Flink and Hive as a beta version in Flink 1.9. Over the past few months, we have been listening to users’ requests and feedback, extensively enhancing our product, and running rigorous benchmarks (which will be published soon separately). I’m glad to announce that the integration between Flink and Hive is at production grade in [Flink 1.10](https://flink.apache.org/news/2020/02/11/release-1.10.0.html) and we can’t wait to walk you through the details. + + +### Unified Metadata Management + +Hive Metastore has evolved into the de facto metadata hub over the years in the Hadoop, or even the cloud, ecosystem. Many companies have a single Hive Metastore service instance in production to manage all of their schemas, either Hive or non-Hiv
[flink] branch master updated: [hotfix][jdbc] add license header to PostgresCatalogITCase
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new eeb9108 [hotfix][jdbc] add license header to PostgresCatalogITCase eeb9108 is described below commit eeb91086420a956eba3439ff9530685e25f772cf Author: bowen.li AuthorDate: Thu Mar 26 15:57:23 2020 -0700 [hotfix][jdbc] add license header to PostgresCatalogITCase --- .../java/io/jdbc/catalog/PostgresCatalogITCase.java| 18 ++ 1 file changed, 18 insertions(+) diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java index 143206c..8421cf6 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.api.java.io.jdbc.catalog; import org.apache.flink.table.api.EnvironmentSettings;
[flink] branch master updated: [FLINK-16810][jdbc] add back PostgresCatalogITCase
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8cf8d8d [FLINK-16810][jdbc] add back PostgresCatalogITCase 8cf8d8d is described below commit 8cf8d8d86121092714ef71109f91670506457710 Author: bowen.li AuthorDate: Thu Mar 26 11:22:53 2020 -0700 [FLINK-16810][jdbc] add back PostgresCatalogITCase add back PostgresCatalogITCase which is supposed to be part of #11468 --- .../io/jdbc/catalog/PostgresCatalogITCase.java | 72 ++ 1 file changed, 72 insertions(+) diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java new file mode 100644 index 000..143206c --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -0,0 +1,72 @@ +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableUtils; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.List; + +import static org.apache.flink.api.java.io.jdbc.catalog.PostgresCatalog.DEFAULT_DATABASE; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; +import static org.junit.Assert.assertEquals; + +/** + * E2E test for {@link PostgresCatalog}. + */ +public class PostgresCatalogITCase extends PostgresCatalogTestBase { + + @Test + public void test_withoutSchema() throws Exception { + TableEnvironment tEnv = getTableEnvWithPgCatalog(); + + List results = TableUtils.collectToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE1))); + assertEquals("[1]", results.toString()); + } + + @Test + public void test_withSchema() throws Exception { + TableEnvironment tEnv = getTableEnvWithPgCatalog(); + + List results = TableUtils.collectToList( + tEnv.sqlQuery(String.format("select * from `%s`", PostgresTablePath.fromFlinkTableName(TABLE1; + assertEquals("[1]", results.toString()); + } + + @Test + public void test_fullPath() throws Exception { + TableEnvironment tEnv = getTableEnvWithPgCatalog(); + + List results = TableUtils.collectToList( + tEnv.sqlQuery(String.format("select * from %s.%s.`%s`", + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + PostgresTablePath.fromFlinkTableName(TABLE1; + assertEquals("[1]", results.toString()); + } + + @Test + public void test_insert() throws Exception { + TableEnvironment tEnv = getTableEnvWithPgCatalog(); + + tEnv.sqlUpdate(String.format("insert into %s select * from `%s`", TABLE4, TABLE1)); + tEnv.execute("test"); + + List results = TableUtils.collectToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE1))); + assertEquals("[1]", results.toString()); + } + + private TableEnvironment getTableEnvWithPgCatalog() { + EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); + + tableEnv.registerCatalog(TEST_CATALOG_NAME, catalog); + tableEnv.useCatalog(TEST_CATALOG_NAME); + return tableEnv; + } +}
[flink] branch master updated (358e4e7 -> 25f2d62)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 358e4e7 [hotfix][jdbc] rename PostgresCatalogITCase to PostgresCatalogTest add 25f2d62 [FLINK-16498] connect PostgresCatalog to table planner No new revisions were added by this update. Summary of changes: .../api/java/io/jdbc/catalog/JDBCCatalog.java | 11 +- .../api/java/io/jdbc/catalog/PostgresCatalog.java | 23 ++- .../java/io/jdbc/catalog/PostgresTablePath.java| 6 +- .../api/java/io/jdbc/dialect/JDBCDialects.java | 5 + .../table/descriptors/JDBCCatalogDescriptor.java | 8 + .../java/io/jdbc/catalog/PostgresCatalogTest.java | 181 +-- ...talogTest.java => PostgresCatalogTestBase.java} | 198 + .../catalog/factory/JDBCCatalogFactoryTest.java| 3 + 8 files changed, 93 insertions(+), 342 deletions(-) copy flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/{PostgresCatalogTest.java => PostgresCatalogTestBase.java} (54%)
[flink] branch master updated: [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new ce52f3b [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery ce52f3b is described below commit ce52f3bcddc032cc4b3cb54c33eb1376df42c887 Author: bowen.li AuthorDate: Fri Mar 20 17:39:09 2020 -0700 [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery closes #16702 --- flink-connectors/flink-jdbc/pom.xml| 8 ++ .../java/io/jdbc/catalog/AbstractJDBCCatalog.java | 15 .../jdbc/catalog/factory/JDBCCatalogFactory.java | 93 ++ .../table/descriptors/JDBCCatalogDescriptor.java | 60 ++ .../table/descriptors/JDBCCatalogValidator.java| 43 ++ .../org.apache.flink.table.factories.TableFactory | 1 + .../catalog/factory/JDBCCatalogFactoryTest.java| 85 .../descriptors/JDBCCatalogDescriptorTest.java | 63 +++ 8 files changed, 368 insertions(+) diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml index cb7afab..0af5588 100644 --- a/flink-connectors/flink-jdbc/pom.xml +++ b/flink-connectors/flink-jdbc/pom.xml @@ -76,6 +76,14 @@ under the License. test + + org.apache.flink + flink-table-common + ${project.version} + test-jar + test + + org.apache.flink diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java index 523de83..6e7dd02 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java @@ -102,6 +102,21 @@ public abstract class AbstractJDBCCatalog extends AbstractCatalog { LOG.info("Catalog {} closing", getName()); } + // - getters -- + + public String getUsername() { + return username; + } + + public String getPassword() { + return pwd; + } + + public String getBaseUrl() { + return baseUrl; + } + + // -- table factory -- public Optional getTableFactory() { diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java new file mode 100644 index 000..05f08c7 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog.factory; + +import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.JDBCCatalogValidator; +import org.apache.flink.table.factories.CatalogFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; +import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; +import static org.apache.flink.table.descriptors.JDBCCatalogV
[flink] branch master updated (ce52f3b -> 358e4e7)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ce52f3b [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery add 358e4e7 [hotfix][jdbc] rename PostgresCatalogITCase to PostgresCatalogTest No new revisions were added by this update. Summary of changes: .../catalog/{PostgresCatalogITCase.java => PostgresCatalogTest.java}| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/{PostgresCatalogITCase.java => PostgresCatalogTest.java} (99%)
[flink] branch master updated (8a27bd9 -> 75ad29c)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8a27bd9 [FLINK-16691][python][docs] Improve Python UDF documentation to remind users to install PyFlink on the cluster new 74b8bde [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog new 75ad29c [FLINK-16472] support precision of timestamp and time data types The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../src/test/resources/log4j2-test.properties | 28 -- flink-connectors/flink-jdbc/pom.xml| 36 ++- .../java/io/jdbc/catalog/AbstractJDBCCatalog.java | 277 ++ .../api/java/io/jdbc/catalog/JDBCCatalog.java | 84 ++ .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java | 54 .../api/java/io/jdbc/catalog/PostgresCatalog.java | 322 .../java/io/jdbc/catalog/PostgresTablePath.java| 95 ++ .../api/java/io/jdbc/dialect/JDBCDialects.java | 10 +- .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java | 25 +- .../io/jdbc/catalog/PostgresCatalogITCase.java | 325 + .../io/jdbc/catalog/PostgresTablePathTest.java | 12 +- 11 files changed, 1222 insertions(+), 46 deletions(-) delete mode 100644 flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalog.java create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtils.java create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java create mode 100644 flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePath.java copy flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java => flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/JDBCCatalogUtilsTest.java (60%) create mode 100644 flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java copy flink-table/flink-table-common/src/test/java/org/apache/flink/table/module/CoreModuleTest.java => flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresTablePathTest.java (72%)
[flink] 01/02: [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 74b8bdee9fb0bf7cfc27ca8d992dac2a07473a0c Author: bowen.li AuthorDate: Fri Mar 6 14:05:27 2020 -0800 [FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog closes #11336 --- .../src/test/resources/log4j2-test.properties | 28 -- flink-connectors/flink-jdbc/pom.xml| 36 ++- .../java/io/jdbc/catalog/AbstractJDBCCatalog.java | 277 ++ .../api/java/io/jdbc/catalog/JDBCCatalog.java | 84 ++ .../api/java/io/jdbc/catalog/JDBCCatalogUtils.java | 54 .../api/java/io/jdbc/catalog/PostgresCatalog.java | 323 .../java/io/jdbc/catalog/PostgresTablePath.java| 95 ++ .../api/java/io/jdbc/dialect/JDBCDialects.java | 10 +- .../java/io/jdbc/catalog/JDBCCatalogUtilsTest.java | 44 +++ .../io/jdbc/catalog/PostgresCatalogITCase.java | 325 + .../io/jdbc/catalog/PostgresTablePathTest.java | 33 +++ 11 files changed, 1275 insertions(+), 34 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties deleted file mode 100644 index 835c2ec..000 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j2-test.properties +++ /dev/null @@ -1,28 +0,0 @@ - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -rootLogger.level = OFF -rootLogger.appenderRef.test.ref = TestLogger - -appender.testlogger.name = TestLogger -appender.testlogger.type = CONSOLE -appender.testlogger.target = SYSTEM_ERR -appender.testlogger.layout.type = PatternLayout -appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml index 3e83311..cb7afab 100644 --- a/flink-connectors/flink-jdbc/pom.xml +++ b/flink-connectors/flink-jdbc/pom.xml @@ -35,6 +35,11 @@ under the License. jar + + 42.2.10 + 0.13.3 + + @@ -53,13 +58,17 @@ under the License. provided + + - org.apache.derby - derby - 10.14.2.0 - test + org.postgresql + postgresql + ${postgres.version} + provided + + org.apache.flink flink-test-utils_${scala.binary.version} @@ -89,5 +98,24 @@ under the License. ${project.version} test + + + + + com.opentable.components + otj-pg-embedded + ${otj-pg-embedded.version} + test + + + + + + org.apache.derby + derby + 10.14.2.0 + test + + diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java new file mode 100644 index 000..523de83 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for ad
[flink] 02/02: [FLINK-16472] support precision of timestamp and time data types
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 75ad29cb9f4f377df27b71e67dbd33f36bb08bee Author: bowen.li AuthorDate: Sun Mar 8 20:58:27 2020 -0700 [FLINK-16472] support precision of timestamp and time data types closes #11336 --- .../api/java/io/jdbc/catalog/PostgresCatalog.java | 15 +++--- .../io/jdbc/catalog/PostgresCatalogITCase.java | 24 +++--- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java index d12f254..c598073 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java @@ -55,8 +55,6 @@ public class PostgresCatalog extends AbstractJDBCCatalog { private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class); - public static final String POSTGRES_TABLE_TYPE = "postgres"; - public static final String DEFAULT_DATABASE = "postgres"; // -- Postgres default objects that shouldn't be exposed to users -- @@ -236,6 +234,7 @@ public class PostgresCatalog extends AbstractJDBCCatalog { String pgType = metadata.getColumnTypeName(colIndex); int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); switch (pgType) { case PG_BOOLEAN: @@ -286,17 +285,17 @@ public class PostgresCatalog extends AbstractJDBCCatalog { case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); case PG_TIMESTAMP: - return DataTypes.TIMESTAMP(); + return DataTypes.TIMESTAMP(scale); case PG_TIMESTAMP_ARRAY: - return DataTypes.ARRAY(DataTypes.TIMESTAMP()); + return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); case PG_TIMESTAMPTZ: - return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale); case PG_TIMESTAMPTZ_ARRAY: - return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); + return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale)); case PG_TIME: - return DataTypes.TIME(); + return DataTypes.TIME(scale); case PG_TIME_ARRAY: - return DataTypes.ARRAY(DataTypes.TIME()); + return DataTypes.ARRAY(DataTypes.TIME(scale)); case PG_DATE: return DataTypes.DATE(); case PG_DATE_ARRAY: diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java index e103780..b197bf0 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -249,14 +249,14 @@ public class PostgresCatalogITCase { .field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) .field("character_varying", DataTypes.VARCHAR(20)) .field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) - .field("timestamp", DataTypes.TIMESTAMP()) - .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP())) - .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) - .field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) + .field("timestamp", DataTypes.TIMESTAMP(5)) + .field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_Z
[flink] branch master updated: [FLINK-15349] add 'create catalog' DDL to blink planner
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b92803d [FLINK-15349] add 'create catalog' DDL to blink planner b92803d is described below commit b92803d3bf7fd0d251f2b70afc70dccb7ff695ec Author: bowen.li AuthorDate: Mon Feb 17 20:13:23 2020 -0800 [FLINK-15349] add 'create catalog' DDL to blink planner add 'create catalog' DDL to blink planner closes #6. --- .../flink/table/client/cli/SqlCommandParser.java | 4 + .../table/client/cli/SqlCommandParserTest.java | 4 + .../src/main/codegen/data/Parser.tdd | 2 + .../src/main/codegen/includes/parserImpls.ftl | 24 + .../flink/sql/parser/ddl/SqlCreateCatalog.java | 109 + .../flink/sql/parser/FlinkSqlParserImplTest.java | 15 +++ .../table/api/internal/TableEnvironmentImpl.java | 11 +++ .../operations/ddl/CreateCatalogOperation.java | 62 .../operations/SqlToOperationConverter.java| 22 + .../flink/table/planner/catalog/CatalogITCase.java | 54 ++ 10 files changed, 307 insertions(+) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java index 1387d18..03e2f63 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java @@ -111,6 +111,10 @@ public final class SqlCommandParser { "USE\\s+(?!CATALOG)(.*)", SINGLE_OPERAND), + CREATE_CATALOG( + "(CREATE\\s+CATALOG\\s+.*)", + SINGLE_OPERAND), + DESCRIBE( "DESCRIBE\\s+(.*)", SINGLE_OPERAND), diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java index 54e0a7d..2509c3c 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java @@ -83,6 +83,10 @@ public class SqlCommandParserTest { testValidSqlCommand("reset;", new SqlCommandCall(SqlCommand.RESET)); testValidSqlCommand("source /my/file", new SqlCommandCall(SqlCommand.SOURCE, new String[] {"/my/file"})); testInvalidSqlCommand("source"); // missing path + testValidSqlCommand("create CATALOG c1", + new SqlCommandCall(SqlCommand.CREATE_CATALOG, new String[]{"create CATALOG c1"})); + testValidSqlCommand("create CATALOG c1 WITH ('k'='v')", + new SqlCommandCall(SqlCommand.CREATE_CATALOG, new String[]{"create CATALOG c1 WITH ('k'='v')"})); testValidSqlCommand("USE CATALOG default", new SqlCommandCall(SqlCommand.USE_CATALOG, new String[]{"default"})); testValidSqlCommand("use default", new SqlCommandCall(SqlCommand.USE, new String[] {"default"})); testInvalidSqlCommand("use catalog"); diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 7f4011a..1783a5e 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -43,6 +43,7 @@ "org.apache.flink.sql.parser.ddl.SqlAlterTable", "org.apache.flink.sql.parser.ddl.SqlAlterTableRename", "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties", +"org.apache.flink.sql.parser.ddl.SqlCreateCatalog", "org.apache.flink.sql.parser.dml.RichSqlInsert", "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword", "org.apache.flink.sql.parser.dql.SqlShowCatalogs", @@ -453,6 +454,7 @@ # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls. # Each must accept arguments "(SqlParserPos pos, boolean replace)". createStatementParserMethods: [ +"SqlCreateCatalog", "SqlCreateTable", "SqlCreateView", "SqlCreateDatabase", diff --git a/f
[flink] branch master updated (e3d5820 -> b6ec10f)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e3d5820 [FLINK-15473][core] Add ppc64le to the list of known processor architectures. add b6ec10f [FLINK-16055][hive] Avoid catalog functions when listing Hive built-in functions No new revisions were added by this update. Summary of changes: .../table/catalog/hive/client/HiveShimV120.java| 26 +- 1 file changed, 15 insertions(+), 11 deletions(-)
[flink] branch master updated (736ebc0 -> 63e9e9d)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 736ebc0 [hotfix][build] Remove various unused test dependencies add 63e9e9d [FLINK-14802][orc][hive] Multi vectorized read version support for hive orc read No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md | 26 +++-- docs/dev/table/hive/index.zh.md| 26 +++-- docs/dev/table/hive/read_write_hive.md | 1 - docs/dev/table/hive/read_write_hive.zh.md | 1 - flink-connectors/flink-connector-hive/pom.xml | 67 +++ .../connectors/hive/read/HiveTableInputFormat.java | 5 - .../hive/read/HiveVectorizedOrcSplitReader.java| 39 --- .../{flink-orc => flink-orc-nohive}/pom.xml| 54 +++-- .../flink/orc/nohive/OrcNoHiveSplitReaderUtil.java | 93 +++ .../flink/orc/nohive/shim/OrcNoHiveShim.java | 96 +++ .../nohive/vector/AbstractOrcNoHiveVector.java}| 44 +++ .../orc/nohive/vector/OrcNoHiveBatchWrapper.java | 45 +++ .../orc/nohive/vector/OrcNoHiveBytesVector.java| 44 +++ .../orc/nohive/vector/OrcNoHiveDecimalVector.java | 46 .../orc/nohive/vector/OrcNoHiveDoubleVector.java | 46 .../orc/nohive/vector/OrcNoHiveLongVector.java | 65 +++ .../nohive/vector/OrcNoHiveTimestampVector.java| 48 .../OrcColumnarRowSplitReaderNoHiveTest.java | 121 +++ flink-formats/flink-orc/pom.xml| 12 ++ .../flink/orc/OrcColumnarRowSplitReader.java | 15 ++- .../java/org/apache/flink/orc/OrcInputFormat.java | 4 +- .../org/apache/flink/orc/OrcRowSplitReader.java| 5 +- .../java/org/apache/flink/orc/OrcSplitReader.java | 14 +-- .../org/apache/flink/orc/OrcSplitReaderUtil.java | 43 --- .../java/org/apache/flink/orc/shim/OrcShim.java| 11 +- .../org/apache/flink/orc/shim/OrcShimV200.java | 10 +- .../flink/orc/vector/AbstractOrcColumnVector.java | 6 +- .../flink/orc/vector/HiveOrcBatchWrapper.java | 43 +++ .../orc/vector/OrcVectorizedBatchWrapper.java | 35 ++ .../flink/orc/OrcColumnarRowSplitReaderTest.java | 130 +++-- flink-formats/pom.xml | 1 + tools/travis/stage.sh | 1 + 32 files changed, 1002 insertions(+), 195 deletions(-) copy flink-formats/{flink-orc => flink-orc-nohive}/pom.xml (70%) create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveSplitReaderUtil.java create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/shim/OrcNoHiveShim.java copy flink-formats/{flink-orc/src/main/java/org/apache/flink/orc/vector/AbstractOrcColumnVector.java => flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/AbstractOrcNoHiveVector.java} (80%) create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/OrcNoHiveBatchWrapper.java create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/OrcNoHiveBytesVector.java create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/OrcNoHiveDecimalVector.java create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/OrcNoHiveDoubleVector.java create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/OrcNoHiveLongVector.java create mode 100644 flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/vector/OrcNoHiveTimestampVector.java create mode 100644 flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java create mode 100644 flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/HiveOrcBatchWrapper.java create mode 100644 flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/OrcVectorizedBatchWrapper.java
[flink] branch release-1.10 updated: [FLINK-15933][catalog][doc] Update content of how generic table schema is stored in hive via HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 220fc5b [FLINK-15933][catalog][doc] Update content of how generic table schema is stored in hive via HiveCatalog 220fc5b is described below commit 220fc5b6c04ac3c4d9a59b33177092136b9006b1 Author: Rui Li AuthorDate: Thu Feb 6 16:14:35 2020 +0800 [FLINK-15933][catalog][doc] Update content of how generic table schema is stored in hive via HiveCatalog closes #11029 --- docs/dev/table/hive/hive_catalog.md| 155 - docs/dev/table/hive/hive_catalog.zh.md | 155 - docs/dev/table/hive/index.md | 95 docs/dev/table/hive/index.zh.md| 96 4 files changed, 302 insertions(+), 199 deletions(-) diff --git a/docs/dev/table/hive/hive_catalog.md b/docs/dev/table/hive/hive_catalog.md index e5285a1..92b05cb 100644 --- a/docs/dev/table/hive/hive_catalog.md +++ b/docs/dev/table/hive/hive_catalog.md @@ -52,6 +52,21 @@ as those of an overall Flink-Hive integration. Once configured properly, `HiveCatalog` should just work out of box. Users can create Flink meta-objects with DDL, and shoud see them immediately afterwards. +`HiveCatalog` can be used to handle two kinds of tables: Hive-compatible tables and generic tables. Hive-compatible tables +are those stored in a Hive-compatible way, in terms of both metadata and data in the storage layer. Therefore, Hive-compatible tables +created via Flink can be queried from Hive side. + +Generic tables, on the other hand, are specific to Flink. When creating generic tables with `HiveCatalog`, we're just using +HMS to persist the metadata. While these tables are visible to Hive, it's unlikely Hive is able to understand +the metadata. And therefore using such tables in Hive leads to undefined behavior. + +Flink uses the property '*is_generic*' to tell whether a table is Hive-compatible or generic. When creating a table with +`HiveCatalog`, it's by default considered generic. If you'd like to create a Hive-compatible table, make sure to set +`is_generic` to false in your table properties. + +As stated above, generic tables shouldn't be used from Hive. In Hive CLI, you can call `DESCRIBE FORMATTED` for a table and +decide whether it's generic or not by checking the `is_generic` property. Generic tables will have `is_generic=true`. + ### Example We will walk through a simple example here. @@ -190,7 +205,7 @@ root {% endhighlight %} -Verify the table is also visible to Hive via Hive Cli: +Verify the table is also visible to Hive via Hive Cli, and note that the table has property `is_generic=true`: {% highlight bash %} hive> show tables; @@ -198,10 +213,47 @@ OK mykafka Time taken: 0.038 seconds, Fetched: 1 row(s) -{% endhighlight %} +hive> describe formatted mykafka; +OK +# col_name data_type comment + + +# Detailed Table Information +Database: default +Owner: null +CreateTime:.. +LastAccessTime:UNKNOWN +Retention: 0 +Location: .. +Table Type:MANAGED_TABLE +Table Parameters: + flink.connector.properties.bootstrap.serverslocalhost:9092 + flink.connector.properties.zookeeper.connectlocalhost:2181 + flink.connector.topic test + flink.connector.typekafka + flink.connector.version universal + flink.format.type csv + flink.generic.table.schema.0.data-type VARCHAR(2147483647) + flink.generic.table.schema.0.name name + flink.generic.table.schema.1.data-type INT + flink.generic.table.schema.1.name age + flink.update-mode append + is_generic true + transient_lastDdlTime .. + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat +Compressed:No +Num Buckets: -1 +Bucket Columns:[] +Sort Columns: [] +Storage Desc Params: + serialization.format1 +Time taken: 0.158 seconds, Fetched: 36 row(s) -Please note since this is a generic table, Hive doesn't understand such a table and using this table in -Hive leads to undefined behavior. +{% endhighlight %} step 5: run Flink SQL to query the Kakfa table @@ -243,3 +295,98 @@ You should see results produced by Flink in SQL Client now, as: kaiky18 {% endhighlight %} + +## Supported Types + +`HiveCatalog` supports all Flink types for generic tables. + +For
[flink] branch master updated (5614f21 -> 3766daa)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5614f21 [FLINK-15546][table-planner-blink] Fix obscure error message from ScalarOperatorGens::generateCast (#11021) add 3766daa [FLINK-15933][catalog][doc] Update content of how generic table schema is stored in hive via HiveCatalog No new revisions were added by this update. Summary of changes: docs/dev/table/hive/hive_catalog.md| 155 - docs/dev/table/hive/hive_catalog.zh.md | 155 - docs/dev/table/hive/index.md | 95 docs/dev/table/hive/index.zh.md| 96 4 files changed, 302 insertions(+), 199 deletions(-)
[flink] branch release-1.10 updated: [hotfix][docs] Fix missing double quotes in catalog docs
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 92d130c [hotfix][docs] Fix missing double quotes in catalog docs 92d130c is described below commit 92d130c7cad8e76d6cf77906394bdba1a48434dd Author: Tartarus AuthorDate: Tue Feb 4 12:01:08 2020 +0800 [hotfix][docs] Fix missing double quotes in catalog docs closes #11008 --- docs/dev/table/catalogs.md| 2 +- docs/dev/table/catalogs.zh.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md index 6fb2b47..876e808 100644 --- a/docs/dev/table/catalogs.md +++ b/docs/dev/table/catalogs.md @@ -134,7 +134,7 @@ catalog.createTable( ) ); -List tables = catalog.listTables("mydb); // tables should contain "mytable" +List tables = catalog.listTables("mydb"); // tables should contain "mytable" {% endhighlight %} diff --git a/docs/dev/table/catalogs.zh.md b/docs/dev/table/catalogs.zh.md index e07e840..1814f5c 100644 --- a/docs/dev/table/catalogs.zh.md +++ b/docs/dev/table/catalogs.zh.md @@ -134,7 +134,7 @@ catalog.createTable( ) ); -List tables = catalog.listTables("mydb); // tables should contain "mytable" +List tables = catalog.listTables("mydb"); // tables should contain "mytable" {% endhighlight %}
[flink] branch release-1.10 updated (73c2812 -> 6afd287)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 73c2812 [hotfix][tests] Fix incorrect version reference bug during 1.10 backport add 6afd287 [FLINK-15592][hive] Add black list for Hive built-in functions No new revisions were added by this update. Summary of changes: .../apache/flink/table/module/hive/HiveModule.java | 23 +--- .../flink/table/module/hive/HiveModuleTest.java| 41 -- 2 files changed, 49 insertions(+), 15 deletions(-)
[flink] branch master updated (f7a7d89 -> 32b7a9d)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f7a7d89 [FLINK-15691][tests] Remove runInTaskExecutorThreadAndWait() add 32b7a9d [FLINK-15592][hive] Add black list for Hive built-in functions No new revisions were added by this update. Summary of changes: .../apache/flink/table/module/hive/HiveModule.java | 23 +--- .../flink/table/module/hive/HiveModuleTest.java| 41 -- 2 files changed, 49 insertions(+), 15 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15590][doc] add section for current catalog and current database
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 4e39323 [FLINK-15590][doc] add section for current catalog and current database 4e39323 is described below commit 4e39323b9c52fcd30019f8be31ea5f37d7595e1d Author: bowen.li AuthorDate: Tue Jan 14 17:30:01 2020 -0800 [FLINK-15590][doc] add section for current catalog and current database --- docs/dev/table/common.md| 13 + docs/dev/table/common.zh.md | 13 + 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md index 6c7a3ed..eabc86a 100644 --- a/docs/dev/table/common.md +++ b/docs/dev/table/common.md @@ -297,7 +297,7 @@ Create Tables in the Catalog A `TableEnvironment` maintains a map of catalogs of tables which are created with an identifier. Each identifier consists of 3 parts: catalog name, database name and object name. If a catalog or database is not -specified, the current default value will be used (see examples in the [Table identifier expanding](#table-identifier-expanding) section). +specified, the current default value will be used (see examples in the [Table identifier expanding]({{ site.baseurl }}/dev/table/common.html#table-identifier-expanding) section). Tables can be either virtual (`VIEWS`) or regular (`TABLES`). `VIEWS` can be created from an existing `Table` object, usually the result of a Table API or SQL query. `TABLES` describe @@ -433,9 +433,14 @@ tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)") ### Expanding Table identifiers -Tables are always registered with a 3 part identifier consisting of catalog, database, and -table name. The first two parts are optional and if they are not provided the set default values will -be used. Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`` ` ``). +Tables are always registered with a 3-part identifier consisting of catalog, database, and table name. + +Users can set one catalog and one database inside it to be the “current catalog” and “current database”. +With them, the first two parts in the 3-parts identifier mentioned above can be optional - if they are not provided, +the current catalog and current database will be referred. Users can switch the current catalog and current database via +table API or SQL. + +Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`` ` ``). Additionally all SQL reserved keywords must be escaped. diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md index 6c7a3ed..eabc86a 100644 --- a/docs/dev/table/common.zh.md +++ b/docs/dev/table/common.zh.md @@ -297,7 +297,7 @@ Create Tables in the Catalog A `TableEnvironment` maintains a map of catalogs of tables which are created with an identifier. Each identifier consists of 3 parts: catalog name, database name and object name. If a catalog or database is not -specified, the current default value will be used (see examples in the [Table identifier expanding](#table-identifier-expanding) section). +specified, the current default value will be used (see examples in the [Table identifier expanding]({{ site.baseurl }}/dev/table/common.html#table-identifier-expanding) section). Tables can be either virtual (`VIEWS`) or regular (`TABLES`). `VIEWS` can be created from an existing `Table` object, usually the result of a Table API or SQL query. `TABLES` describe @@ -433,9 +433,14 @@ tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)") ### Expanding Table identifiers -Tables are always registered with a 3 part identifier consisting of catalog, database, and -table name. The first two parts are optional and if they are not provided the set default values will -be used. Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`` ` ``). +Tables are always registered with a 3-part identifier consisting of catalog, database, and table name. + +Users can set one catalog and one database inside it to be the “current catalog” and “current database”. +With them, the first two parts in the 3-parts identifier mentioned above can be optional - if they are not provided, +the current catalog and current database will be referred. Users can switch the current catalog and current database via +table API or SQL. + +Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`` ` ``). Additionally all SQL reserved keywords must be escaped.
[flink] branch master updated (6bcb9bf -> 2231e1b)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6bcb9bf [FLINK-15589][doc] remove beta tag from catalog and hive doc add 2231e1b [FLINK-15590][doc] add section for current catalog and current database No new revisions were added by this update. Summary of changes: docs/dev/table/common.md| 13 + docs/dev/table/common.zh.md | 13 + 2 files changed, 18 insertions(+), 8 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15589][doc] remove beta tag from catalog and hive doc
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new af93ca2 [FLINK-15589][doc] remove beta tag from catalog and hive doc af93ca2 is described below commit af93ca2502e8f083fda99aea7b6e720a597d2a0a Author: bowen.li AuthorDate: Tue Jan 14 17:11:35 2020 -0800 [FLINK-15589][doc] remove beta tag from catalog and hive doc closes #10856 --- docs/dev/table/catalogs.md | 1 - docs/dev/table/catalogs.zh.md | 1 - docs/dev/table/hive/index.md| 1 - docs/dev/table/hive/index.zh.md | 1 - 4 files changed, 4 deletions(-) diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md index 07f5958..79972e1 100644 --- a/docs/dev/table/catalogs.md +++ b/docs/dev/table/catalogs.md @@ -1,6 +1,5 @@ --- title: "Catalogs" -is_beta: true nav-parent_id: tableapi nav-pos: 80 --- diff --git a/docs/dev/table/catalogs.zh.md b/docs/dev/table/catalogs.zh.md index e0897f6..e07e840 100644 --- a/docs/dev/table/catalogs.zh.md +++ b/docs/dev/table/catalogs.zh.md @@ -1,6 +1,5 @@ --- title: "Catalogs" -is_beta: true nav-parent_id: tableapi nav-pos: 80 --- diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index c054843..f15b280 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -3,7 +3,6 @@ title: "Hive Integration" nav-id: hive_tableapi nav-parent_id: tableapi nav-pos: 100 -is_beta: true nav-show_overview: true ---
[flink] branch master updated (7513d9b -> 6bcb9bf)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7513d9b [FLINK-15517][Runtime][Configuration][Network] Use back 'network' in 'shuffle' memory config option names add 6bcb9bf [FLINK-15589][doc] remove beta tag from catalog and hive doc No new revisions were added by this update. Summary of changes: docs/dev/table/catalogs.md | 1 - docs/dev/table/catalogs.zh.md | 1 - docs/dev/table/hive/index.md| 1 - docs/dev/table/hive/index.zh.md | 1 - 4 files changed, 4 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15576] remove isTemporary property from CatalogFunction API
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 7667d43 [FLINK-15576] remove isTemporary property from CatalogFunction API 7667d43 is described below commit 7667d43cf216184818dac56d2c243c0c56f31ef1 Author: bowen.li AuthorDate: Mon Jan 13 13:28:00 2020 -0800 [FLINK-15576] remove isTemporary property from CatalogFunction API according to FLIP-79, CatalogFunction shouldn't have "isTemporary" property. Moving that from CatalogFunction to Create/AlterCatalogFunctionOperation closes #10846. --- flink-python/pyflink/table/catalog.py | 7 --- flink-python/pyflink/table/tests/test_catalog.py| 1 - .../flink/table/api/internal/TableEnvironmentImpl.java | 4 ++-- .../apache/flink/table/catalog/CatalogFunctionImpl.java | 17 - .../table/operations/ddl/AlterFunctionOperation.java| 10 +- .../table/operations/ddl/CreateFunctionOperation.java | 10 +- .../flink/table/catalog/GenericInMemoryCatalogTest.java | 2 +- .../org/apache/flink/table/catalog/CatalogFunction.java | 7 --- .../planner/operations/SqlToOperationConverter.java | 12 ++-- .../planner/runtime/stream/sql/FunctionITCase.java | 2 +- .../flink/table/sqlexec/SqlToOperationConverter.java| 17 - .../apache/flink/table/api/internal/TableEnvImpl.scala | 4 ++-- .../flink/table/runtime/stream/sql/FunctionITCase.java | 2 +- 13 files changed, 43 insertions(+), 52 deletions(-) diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 532c132..aac8dcc 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -797,13 +797,6 @@ class CatalogFunction(object): """ return self._j_catalog_function.isGeneric() -def is_temporary(self): -""" -Wheter or not the function is a temporary function. -:return: Wheter is a temporary function. -""" -return self._j_catalog_function.isTemporary() - def get_function_language(self): """ Get the language used for the function definition. diff --git a/flink-python/pyflink/table/tests/test_catalog.py b/flink-python/pyflink/table/tests/test_catalog.py index 638f1bf..ed0164b 100644 --- a/flink-python/pyflink/table/tests/test_catalog.py +++ b/flink-python/pyflink/table/tests/test_catalog.py @@ -68,7 +68,6 @@ class CatalogTestBase(PyFlinkTestCase): def check_catalog_function_equals(self, f1, f2): self.assertEqual(f1.get_class_name(), f2.get_class_name()) self.assertEqual(f1.is_generic(), f2.is_generic()) -self.assertEqual(f1.is_temporary(), f2.is_temporary()) self.assertEqual(f1.get_function_language(), f2.get_function_language()) def check_catalog_partition_equals(self, p1, p2): diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 748c339..6ca0553 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -753,7 +753,7 @@ public class TableEnvironmentImpl implements TableEnvironment { String exMsg = getDDLOpExecuteErrorMsg(createFunctionOperation.asSummaryString()); try { CatalogFunction function = createFunctionOperation.getCatalogFunction(); - if (function.isTemporary()) { + if (createFunctionOperation.isTemporary()) { boolean exist = functionCatalog.hasTemporaryCatalogFunction( createFunctionOperation.getFunctionIdentifier()); if (!exist) { @@ -787,7 +787,7 @@ public class TableEnvironmentImpl implements TableEnvironment { String exMsg = getDDLOpExecuteErrorMsg(alterFunctionOperation.asSummaryString()); try { CatalogFunction function = alterFunctionOperation.getCatalogFunction(); - if (function.isTemporary()) { + if (alterFunctionOperation.isTemporary()) { throw new ValidationException( "Alter temporary catalog function is not supported"); } else { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/
[flink] branch master updated (8f67d1d -> 9611de4)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8f67d1d [hotfix] remove isSystemFunction from DropCatalogFunctionOperation as it's never used add 9611de4 [FLINK-15576] remove isTemporary property from CatalogFunction API No new revisions were added by this update. Summary of changes: flink-python/pyflink/table/catalog.py | 7 --- flink-python/pyflink/table/tests/test_catalog.py| 1 - .../flink/table/api/internal/TableEnvironmentImpl.java | 4 ++-- .../apache/flink/table/catalog/CatalogFunctionImpl.java | 17 - .../operations/ddl/AlterCatalogFunctionOperation.java | 10 +- .../operations/ddl/CreateCatalogFunctionOperation.java | 10 +- .../flink/table/catalog/GenericInMemoryCatalogTest.java | 2 +- .../org/apache/flink/table/catalog/CatalogFunction.java | 7 --- .../planner/operations/SqlToOperationConverter.java | 12 ++-- .../planner/runtime/stream/sql/FunctionITCase.java | 2 +- .../flink/table/sqlexec/SqlToOperationConverter.java| 17 - .../apache/flink/table/api/internal/TableEnvImpl.scala | 4 ++-- .../flink/table/runtime/stream/sql/FunctionITCase.java | 2 +- 13 files changed, 43 insertions(+), 52 deletions(-)
[flink] branch release-1.10 updated (6ffe2cc -> 12bb8c8)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 6ffe2cc [FLINK-15533] Deprecate text sink facilitation methods in DataStream add 12bb8c8 [hotfix] remove isSystemFunction from DropCatalogFunctionOperation as it's never used No new revisions were added by this update. Summary of changes: .../flink/table/operations/ddl/DropFunctionOperation.java| 12 ++-- .../table/planner/operations/SqlToOperationConverter.java| 5 ++--- 2 files changed, 4 insertions(+), 13 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15464][hive] Fix HiveTableSourceTest::testPartitionFilterDateTimestamp for 1.x
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 3a13e1b [FLINK-15464][hive] Fix HiveTableSourceTest::testPartitionFilterDateTimestamp for 1.x 3a13e1b is described below commit 3a13e1bf586b011e4018ab328190de308e1d82fd Author: Rui Li AuthorDate: Fri Jan 3 14:31:29 2020 +0800 [FLINK-15464][hive] Fix HiveTableSourceTest::testPartitionFilterDateTimestamp for 1.x closes #10757. --- .../java/org/apache/flink/connectors/hive/HiveTableSourceTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java index 520b2f2..18cdea2 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java @@ -294,11 +294,11 @@ public class HiveTableSourceTest { try { hiveShell.execute("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") - .addRow(new Object[]{1}).commit("p1=date '2018-08-08',p2=timestamp '2018-08-08 08:08:08'"); + .addRow(new Object[]{1}).commit("p1='2018-08-08',p2='2018-08-08 08:08:08'"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") - .addRow(new Object[]{2}).commit("p1=date '2018-08-09',p2=timestamp '2018-08-08 08:08:09'"); + .addRow(new Object[]{2}).commit("p1='2018-08-09',p2='2018-08-08 08:08:09'"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") - .addRow(new Object[]{3}).commit("p1=date '2018-08-10',p2=timestamp '2018-08-08 08:08:10'"); + .addRow(new Object[]{3}).commit("p1='2018-08-10',p2='2018-08-08 08:08:10'"); TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog(
[flink] branch master updated (21aff32 -> 8fbab0d)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 21aff32 [FLINK-15515][hive][doc] Document that Hive connector should be used with blink planner add 8fbab0d [FLINK-15464][hive] Fix HiveTableSourceTest::testPartitionFilterDateTimestamp for 1.x No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/connectors/hive/HiveTableSourceTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15515][hive][doc] Document that Hive connector should be used with blink planner
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 87d566f [FLINK-15515][hive][doc] Document that Hive connector should be used with blink planner 87d566f is described below commit 87d566f967fc3e4ded0150b57c87d7df035e447a Author: Rui Li AuthorDate: Thu Jan 9 20:21:50 2020 +0800 [FLINK-15515][hive][doc] Document that Hive connector should be used with blink planner closes #10813. --- docs/dev/table/hive/index.md | 9 + docs/dev/table/hive/index.zh.md| 9 + docs/dev/table/hive/read_write_hive.md | 1 + docs/dev/table/hive/read_write_hive.zh.md | 1 + docs/dev/table/hive/scala_shell_hive.md| 2 ++ docs/dev/table/hive/scala_shell_hive.zh.md | 2 ++ 6 files changed, 24 insertions(+) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index 179f718..c054843 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -308,12 +308,18 @@ and [HiveCatalog]({{ site.baseurl }}/dev/table/hive/hive_catalog.html) through t If the `hive-conf/hive-site.xml` file is stored in remote storage system, users should download the hive configuration file to their local environment first. +Please note while HiveCatalog doesn't require a particular planner, reading/writing Hive tables only works with blink planner. +Therefore it's highly recommended that you use blink planner when connecting to your Hive warehouse. + Take Hive version 2.3.4 for example: {% highlight java %} +EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); +TableEnvironment tableEnv = TableEnvironment.create(settings); + String name= "myhive"; String defaultDatabase = "mydatabase"; String hiveConfDir = "/opt/hive-conf"; // a local path @@ -329,6 +335,9 @@ tableEnv.useCatalog("myhive"); {% highlight scala %} +val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() +val tableEnv = TableEnvironment.create(settings) + val name= "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf" // a local path diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 179f718..c054843 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -308,12 +308,18 @@ and [HiveCatalog]({{ site.baseurl }}/dev/table/hive/hive_catalog.html) through t If the `hive-conf/hive-site.xml` file is stored in remote storage system, users should download the hive configuration file to their local environment first. +Please note while HiveCatalog doesn't require a particular planner, reading/writing Hive tables only works with blink planner. +Therefore it's highly recommended that you use blink planner when connecting to your Hive warehouse. + Take Hive version 2.3.4 for example: {% highlight java %} +EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); +TableEnvironment tableEnv = TableEnvironment.create(settings); + String name= "myhive"; String defaultDatabase = "mydatabase"; String hiveConfDir = "/opt/hive-conf"; // a local path @@ -329,6 +335,9 @@ tableEnv.useCatalog("myhive"); {% highlight scala %} +val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() +val tableEnv = TableEnvironment.create(settings) + val name= "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf" // a local path diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/read_write_hive.md index f601a22..3528974 100644 --- a/docs/dev/table/hive/read_write_hive.md +++ b/docs/dev/table/hive/read_write_hive.md @@ -24,6 +24,7 @@ under the License. Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. +And please also note that Hive connector only works with blink planner. * This will be replaced by the TOC {:toc} diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/read_write_hive.zh.md index f601a22..3528974 100644 --- a/docs/dev/table/hive/read_write_hive.zh.md +++ b/docs/dev/table/hive/read_write_hive.zh.md @@ -24,6 +24,7 @@ under the License. Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive
[flink] branch master updated (27c29f7 -> 21aff32)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 27c29f7 [FLINK-15387][rocksdb][metrics] Expose missing RocksDB properties out via RocksDBNativeMetricOptions add 21aff32 [FLINK-15515][hive][doc] Document that Hive connector should be used with blink planner No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md | 9 + docs/dev/table/hive/index.zh.md| 9 + docs/dev/table/hive/read_write_hive.md | 1 + docs/dev/table/hive/read_write_hive.zh.md | 1 + docs/dev/table/hive/scala_shell_hive.md| 2 ++ docs/dev/table/hive/scala_shell_hive.zh.md | 2 ++ 6 files changed, 24 insertions(+)
[flink] branch release-1.10 updated: [FLINK-15453][hive] Remove unneeded HiveShim methods
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 15c58a8 [FLINK-15453][hive] Remove unneeded HiveShim methods 15c58a8 is described below commit 15c58a889f7dd6d4d90f428aeb10b2c0f4477e62 Author: Rui Li AuthorDate: Fri Jan 3 13:22:28 2020 +0800 [FLINK-15453][hive] Remove unneeded HiveShim methods closes #10755 --- .../flink/table/catalog/hive/client/HiveShim.java | 46 --- .../table/catalog/hive/client/HiveShimV100.java| 137 - .../table/catalog/hive/client/HiveShimV230.java| 16 --- .../table/catalog/hive/client/HiveShimV310.java| 30 - .../functions/hive/conversion/HiveInspectors.java | 99 ++- 5 files changed, 98 insertions(+), 230 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index cd6bc0b..346a8e1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -22,8 +22,6 @@ import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -38,19 +36,16 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.thrift.TException; import javax.annotation.Nullable; -import java.io.IOException; import java.io.Serializable; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -80,23 +75,6 @@ public interface HiveShim extends Serializable { List getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException; /** -* Moves a particular file or directory to trash. -* The file/directory can potentially be deleted (w/o going to trash) if purge is set to true, or if it cannot -* be moved properly. -* -* This interface is here because FileUtils.moveToTrash in different Hive versions have different signatures. -* -* @param fsthe FileSystem to use -* @param path the path of the file or directory to be moved to trash. -* @param conf the Configuration to use -* @param purge whether try to skip trash and directly delete the file/directory. This flag may be ignored by -* old Hive versions prior to 2.3.0. -* @return true if the move is successful, and false otherwise -* @throws IOException if the file/directory cannot be properly moved or deleted -*/ - boolean moveToTrash(FileSystem fs, Path path, Configuration conf, boolean purge) throws IOException; - - /** * Alters a Hive table. * * @param client the Hive metastore client @@ -145,30 +123,6 @@ public interface HiveShim extends Serializable { Class getTimestampDataTypeClass(); /** -* The return type of HiveStatsUtils.getFileStatusRecurse was changed from array to List in Hive 3.1.0. -* -* @param path the path of the directory -* @param level the level of recursion -* @param fs the file system of the directory -* @return an array of the entries -* @throws IOException in case of any io error -*/ - FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException; - - /** -* The signature of HiveStatsUtils.makeSpecFromName() was changed in Hive 3.1.0. -* -* @param partSpec partition specs -* @param currPath the current path -*/ - void makeSpecFromName(Map partSpec, Path currPath); - - /** -* Get ObjectInspector for a constant value. -*/ - ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo primitiveTypeInfo, Object value
[flink] branch master updated (0a2cc58 -> 29a38a4)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0a2cc58 [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI add 29a38a4 [FLINK-15453][hive] Remove unneeded HiveShim methods No new revisions were added by this update. Summary of changes: .../flink/table/catalog/hive/client/HiveShim.java | 46 --- .../table/catalog/hive/client/HiveShimV100.java| 137 - .../table/catalog/hive/client/HiveShimV230.java| 16 --- .../table/catalog/hive/client/HiveShimV310.java| 30 - .../functions/hive/conversion/HiveInspectors.java | 99 ++- 5 files changed, 98 insertions(+), 230 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 80cfeb8 [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI 80cfeb8 is described below commit 80cfeb84dfa479fd1a41dde8863f88eca8a796ca Author: Rui Li AuthorDate: Fri Jan 3 17:38:33 2020 +0800 [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI closes #10759. --- .../main/java/org/apache/flink/table/client/cli/CliClient.java | 10 ++ .../java/org/apache/flink/table/client/cli/CliStrings.java | 1 + .../org/apache/flink/table/client/cli/SqlCommandParser.java| 4 .../apache/flink/table/client/gateway/local/LocalExecutor.java | 4 ++-- .../java/org/apache/flink/table/client/cli/CliClientTest.java | 3 +++ .../apache/flink/table/client/cli/SqlCommandParserTest.java| 3 +++ 6 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 8b80373..797bca6 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -224,11 +224,12 @@ public class CliClient { terminal.flush(); final Optional parsedStatement = parseCommand(statement); - // only support INSERT INTO + // only support INSERT INTO/OVERWRITE return parsedStatement.map(cmdCall -> { switch (cmdCall.command) { case INSERT_INTO: - return callInsertInto(cmdCall); + case INSERT_OVERWRITE: + return callInsert(cmdCall); default: printError(CliStrings.MESSAGE_UNSUPPORTED_SQL); return false; @@ -294,7 +295,8 @@ public class CliClient { callSelect(cmdCall); break; case INSERT_INTO: - callInsertInto(cmdCall); + case INSERT_OVERWRITE: + callInsert(cmdCall); break; case CREATE_TABLE: callCreateTable(cmdCall); @@ -528,7 +530,7 @@ public class CliClient { } } - private boolean callInsertInto(SqlCommandCall cmdCall) { + private boolean callInsert(SqlCommandCall cmdCall) { printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); try { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java index a04068f..77c7039 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java @@ -50,6 +50,7 @@ public final class CliStrings { .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) + .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) .append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI client.")) .append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties.")) .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java index a01b3c0..1387d18 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java @@ -127,6 +127,10 @@ public final class SqlCommandParser {
[flink] branch master updated: [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 0a2cc58 [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI 0a2cc58 is described below commit 0a2cc58af87a0154160322d0669fcb22c736bde2 Author: Rui Li AuthorDate: Fri Jan 3 17:38:33 2020 +0800 [FLINK-15468][sql-client] INSERT OVERWRITE not supported from SQL CLI closes #10759. --- .../main/java/org/apache/flink/table/client/cli/CliClient.java | 10 ++ .../java/org/apache/flink/table/client/cli/CliStrings.java | 1 + .../org/apache/flink/table/client/cli/SqlCommandParser.java| 4 .../apache/flink/table/client/gateway/local/LocalExecutor.java | 4 ++-- .../java/org/apache/flink/table/client/cli/CliClientTest.java | 3 +++ .../apache/flink/table/client/cli/SqlCommandParserTest.java| 3 +++ 6 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 8b80373..797bca6 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -224,11 +224,12 @@ public class CliClient { terminal.flush(); final Optional parsedStatement = parseCommand(statement); - // only support INSERT INTO + // only support INSERT INTO/OVERWRITE return parsedStatement.map(cmdCall -> { switch (cmdCall.command) { case INSERT_INTO: - return callInsertInto(cmdCall); + case INSERT_OVERWRITE: + return callInsert(cmdCall); default: printError(CliStrings.MESSAGE_UNSUPPORTED_SQL); return false; @@ -294,7 +295,8 @@ public class CliClient { callSelect(cmdCall); break; case INSERT_INTO: - callInsertInto(cmdCall); + case INSERT_OVERWRITE: + callInsert(cmdCall); break; case CREATE_TABLE: callCreateTable(cmdCall); @@ -528,7 +530,7 @@ public class CliClient { } } - private boolean callInsertInto(SqlCommandCall cmdCall) { + private boolean callInsert(SqlCommandCall cmdCall) { printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); try { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java index a04068f..77c7039 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java @@ -50,6 +50,7 @@ public final class CliStrings { .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) + .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) .append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI client.")) .append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties.")) .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java index a01b3c0..1387d18 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java @@ -127,6 +127,10 @@ public final class SqlCommandParser {
[flink] branch release-1.9 updated (55044d3c -> bcf5400)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from 55044d3c [FLINK-15443][jdbc] Fix mismatch between Java float and JDBC float (#10731) add bcf5400 [FLINK-15259][hive] HiveInspector.toInspectors() should convert Flink constant to Hive constant No new revisions were added by this update. Summary of changes: .../flink/connectors/hive/FlinkHiveException.java | 4 + .../catalog/hive/util/HiveReflectionUtils.java | 43 + .../flink/table/functions/hive/HiveGenericUDF.java | 5 +- .../functions/hive/conversion/HiveInspectors.java | 176 - ...sion.java => WritableHiveObjectConversion.java} | 15 +- .../table/functions/hive/HiveGenericUDFTest.java | 27 .../table/functions/hive/HiveGenericUDTFTest.java | 9 +- 7 files changed, 227 insertions(+), 52 deletions(-) create mode 100644 flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java copy flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/{IdentityConversion.java => WritableHiveObjectConversion.java} (68%)
[flink] branch release-1.10 updated (0a89b9a -> 386d506)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 0a89b9a [FLINK-15437][yarn] Apply dynamic properties early on client side. add 386d506 [FLINK-15429][hive] HiveObjectConversion implementations need to handle null values No new revisions were added by this update. Summary of changes: .../flink/table/catalog/hive/client/HiveShim.java | 4 +-- .../table/catalog/hive/client/HiveShimV100.java| 6 + .../table/catalog/hive/client/HiveShimV310.java| 6 + .../functions/hive/conversion/HiveInspectors.java | 15 +-- .../hive/conversion/HiveObjectConversion.java | 4 ++- .../flink/connectors/hive/HiveTableSinkTest.java | 29 ++ .../flink/table/catalog/hive/HiveTestUtils.java| 4 ++- 7 files changed, 62 insertions(+), 6 deletions(-)
[flink] branch master updated (367765b -> 1c5806a)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 367765b [FLINK-15437][yarn] Apply dynamic properties early on client side. add 1c5806a [FLINK-15429][hive] HiveObjectConversion implementations need to handle null values No new revisions were added by this update. Summary of changes: .../flink/table/catalog/hive/client/HiveShim.java | 4 +-- .../table/catalog/hive/client/HiveShimV100.java| 6 + .../table/catalog/hive/client/HiveShimV310.java| 6 + .../functions/hive/conversion/HiveInspectors.java | 15 +-- .../hive/conversion/HiveObjectConversion.java | 4 ++- .../flink/connectors/hive/HiveTableSinkTest.java | 29 ++ .../flink/table/catalog/hive/HiveTestUtils.java| 4 ++- 7 files changed, 62 insertions(+), 6 deletions(-)
[flink] branch release-1.10 updated: [FLINK-14980][docs] add function ddl docs
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new c1099c1 [FLINK-14980][docs] add function ddl docs c1099c1 is described below commit c1099c1a3a5e14baa2db89530ac1bcb786b69e01 Author: hpeter AuthorDate: Tue Dec 31 00:46:25 2019 -0800 [FLINK-14980][docs] add function ddl docs this closes #10732. --- docs/dev/table/sql/alter.md | 26 +- docs/dev/table/sql/alter.zh.md | 26 +- docs/dev/table/sql/create.md| 22 ++ docs/dev/table/sql/create.zh.md | 23 +++ docs/dev/table/sql/drop.md | 20 +++- docs/dev/table/sql/drop.zh.md | 19 ++- 6 files changed, 132 insertions(+), 4 deletions(-) diff --git a/docs/dev/table/sql/alter.md b/docs/dev/table/sql/alter.md index 067de29c..1cf2b92 100644 --- a/docs/dev/table/sql/alter.md +++ b/docs/dev/table/sql/alter.md @@ -31,6 +31,7 @@ Flink SQL supports the following ALTER statements for now: - ALTER TABLE - ALTER DATABASE +- ALTER FUNCTION ## Run an ALTER statement @@ -134,4 +135,27 @@ Set one or more properties in the specified table. If a particular property is a ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...) {% endhighlight %} -Set one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one. \ No newline at end of file +Set one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one. + +## ALTER FUNCTION + +{% highlight sql%} +ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION + [IF EXISTS] [catalog_name.][db_name.]function_name + AS identifier [LANGUAGE JAVA|SCALA| +{% endhighlight %} + +Alter a catalog function with the new identifier which is full classpath for JAVA/SCALA and optional language tag. If a function doesn't exist in the catalog, an exception is thrown. + +**TEMPORARY** +Alter temporary catalog function that has catalog and database namespaces and overrides catalog functions. + +**TEMPORARY SYSTEM** +Alter temporary system function that has no namespace and overrides built-in functions + +**IF EXISTS** +If the function doesn't exist, nothing happens. + +**LANGUAGE JAVA|SCALA** +Language tag to instruct flink runtime how to execute the function. Currently only JAVA and SCALA are supported, the default language for a function is JAVA. + diff --git a/docs/dev/table/sql/alter.zh.md b/docs/dev/table/sql/alter.zh.md index 067de29c..4fed734 100644 --- a/docs/dev/table/sql/alter.zh.md +++ b/docs/dev/table/sql/alter.zh.md @@ -31,6 +31,7 @@ Flink SQL supports the following ALTER statements for now: - ALTER TABLE - ALTER DATABASE +- ALTER FUNCTION ## Run an ALTER statement @@ -134,4 +135,27 @@ Set one or more properties in the specified table. If a particular property is a ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...) {% endhighlight %} -Set one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one. \ No newline at end of file +Set one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one. + +## ALTER FUNCTION + +{% highlight sql%} +ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION + [IF EXISTS] [catalog_name.][db_name.]function_name + AS identifier [LANGUAGE JAVA|SCALA| +{% endhighlight %} + +Alter a catalog function that has catalog and database namespaces with the new identifier which is full classpath for JAVA/SCALA and optional language tag. If a function doesn't exist in the catalog, an exception is thrown. + +**TEMPORARY** +Alter temporary catalog function that has catalog and database namespaces and overrides catalog functions. + +**TEMPORARY SYSTEM** +Alter temporary system function that has no namespace and overrides built-in functions + +**IF EXISTS** +If the function doesn't exist, nothing happens. + +**LANGUAGE JAVA|SCALA** +Language tag to instruct flink runtime how to execute the function. Currently only JAVA and SCALA are supported, the default language for a function is JAVA. + diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md index cc8ee29..2070db9 100644 --- a/docs/dev/table/sql/create.md +++ b/docs/dev/table/sql/create.md @@ -31,6 +31,7 @@ Flink SQL supports the following CREATE statements for now: - CREATE TABLE - CREATE DATABASE +- CREATE FUNCTION ## Run a CREATE statement @@ -225,3 +226,24 @@ Database properties used to store extra information related to this database. The key and value of expression `key1=
[flink] branch master updated (8830ef0 -> cac6b9d)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8830ef0 [FLINK-15377][tests] Remove docker-engine install in Mesos docker image add cac6b9d [FLINK-14980][docs] add function ddl docs No new revisions were added by this update. Summary of changes: docs/dev/table/sql/alter.md | 26 +- docs/dev/table/sql/alter.zh.md | 26 +- docs/dev/table/sql/create.md| 22 ++ docs/dev/table/sql/create.zh.md | 23 +++ docs/dev/table/sql/drop.md | 20 +++- docs/dev/table/sql/drop.zh.md | 19 ++- 6 files changed, 132 insertions(+), 4 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15259][hive] HiveInspector.toInspectors() should convert Flink constant to Hive constant
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new f8b6bed [FLINK-15259][hive] HiveInspector.toInspectors() should convert Flink constant to Hive constant f8b6bed is described below commit f8b6bed54e21ce3a51f9d63389e5a037e89f9ca8 Author: Rui Li AuthorDate: Thu Dec 19 14:30:42 2019 +0800 [FLINK-15259][hive] HiveInspector.toInspectors() should convert Flink constant to Hive constant Convert constant Flink object to Hive object before creating Hive's constant object inspectors. this closes #10625. --- .../flink/connectors/hive/FlinkHiveException.java | 4 + .../flink/table/catalog/hive/client/HiveShim.java | 7 ++ .../table/catalog/hive/client/HiveShimV100.java| 98 +++--- .../table/catalog/hive/client/HiveShimV120.java| 61 -- .../table/catalog/hive/client/HiveShimV310.java| 30 +++ .../catalog/hive/util/HiveReflectionUtils.java | 2 +- .../functions/hive/conversion/HiveInspectors.java | 50 +-- .../conversion/WritableHiveObjectConversion.java | 8 +- .../flink/table/module/hive/HiveModuleTest.java| 30 +++ .../table/planner/codegen/GenerateUtils.scala | 2 +- 10 files changed, 170 insertions(+), 122 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FlinkHiveException.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FlinkHiveException.java index 6a4e2fd..7289dbd 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FlinkHiveException.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FlinkHiveException.java @@ -25,6 +25,10 @@ import org.apache.flink.annotation.PublicEvolving; @PublicEvolving public class FlinkHiveException extends RuntimeException { + public FlinkHiveException(String message) { + super(message); + } + public FlinkHiveException(Throwable cause) { super(cause); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index 6b235a6..4a8ff3e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -43,6 +43,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.thrift.TException; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.Serializable; import java.time.LocalDate; @@ -232,4 +234,9 @@ public interface HiveShim extends Serializable { * Converts a hive date instance to LocalDate which is expected by DataFormatConverter. */ LocalDate toFlinkDate(Object hiveDate); + + /** +* Converts a Hive primitive java object to corresponding Writable object. +*/ + @Nullable Writable hivePrimitiveToWritable(@Nullable Object value); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java index 2b96efc..cf29034 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java @@ -18,12 +18,12 @@ package org.apache.flink.table.catalog.hive.client; +import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate; import org.apache.flink.table.functions.hive.FlinkHiveUDFException; -import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; @@ -32,6 +32,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; i
[flink] branch master updated (f205d75 -> 6f3a077)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f205d75 [FLINK-15290][streaming] Configure user defined function with global configuration add 6f3a077 [FLINK-15259][hive] HiveInspector.toInspectors() should convert Flink constant to Hive constant No new revisions were added by this update. Summary of changes: .../flink/connectors/hive/FlinkHiveException.java | 4 + .../flink/table/catalog/hive/client/HiveShim.java | 7 ++ .../table/catalog/hive/client/HiveShimV100.java| 98 +++--- .../table/catalog/hive/client/HiveShimV120.java| 61 -- .../table/catalog/hive/client/HiveShimV310.java| 30 +++ .../catalog/hive/util/HiveReflectionUtils.java | 2 +- .../functions/hive/conversion/HiveInspectors.java | 50 +-- .../conversion/WritableHiveObjectConversion.java | 8 +- .../flink/table/module/hive/HiveModuleTest.java| 30 +++ .../table/planner/codegen/GenerateUtils.scala | 2 +- 10 files changed, 170 insertions(+), 122 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15290][streaming] Configure user defined function with global configuration
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 694ae9c [FLINK-15290][streaming] Configure user defined function with global configuration 694ae9c is described below commit 694ae9c9c943a8f8cc0aff5aa54ce7207eb3da21 Author: Rui Li AuthorDate: Tue Dec 17 21:01:55 2019 +0800 [FLINK-15290][streaming] Configure user defined function with global configuration Load configurations in HiveTableInputFormat to decide whether vector orc reader should be used. this closes #10632. --- .../flink/connectors/hive/HiveTableSource.java | 10 +- .../connectors/hive/read/HiveTableInputFormat.java | 18 ++-- .../flink/connectors/hive/HiveTableSourceTest.java | 103 + .../apache/flink/table/HiveVersionTestUtil.java| 1 + 4 files changed, 120 insertions(+), 12 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 1c0499c..5d654cf 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -18,6 +18,7 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -133,10 +134,10 @@ public class HiveTableSource implements @SuppressWarnings("unchecked") TypeInformation typeInfo = (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); - HiveTableInputFormat inputFormat = getInputFormat(allHivePartitions); + Configuration conf = GlobalConfiguration.loadConfiguration(); + HiveTableInputFormat inputFormat = getInputFormat(allHivePartitions, conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)); DataStreamSource source = execEnv.createInput(inputFormat, typeInfo); - Configuration conf = GlobalConfiguration.loadConfiguration(); if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) { int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); if (max < 1) { @@ -162,9 +163,10 @@ public class HiveTableSource implements return source.name(explainSource()); } - private HiveTableInputFormat getInputFormat(List allHivePartitions) { + @VisibleForTesting + HiveTableInputFormat getInputFormat(List allHivePartitions, boolean useMapRedReader) { return new HiveTableInputFormat( - jobConf, catalogTable, allHivePartitions, projectedFields, limit, hiveVersion); + jobConf, catalogTable, allHivePartitions, projectedFields, limit, hiveVersion, useMapRedReader); } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java index 3e45fe3..f6a424f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java @@ -18,12 +18,11 @@ package org.apache.flink.connectors.hive.read; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase; -import org.apache.flink.configuration.Configuration; import org.apache.flink.connectors.hive.FlinkHiveException; -import org.apache.flink.connectors.hive.HiveOptions; import org.apache.flink.connectors.hive.HiveTablePartition; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.catalog.CatalogTable; @@ -81,9 +80,10 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase partitions, int[] projectedFields, long limit, - String hiveVersion) { + String hiveVersion, + boolean us
[flink] branch master updated (e1340a0 -> f205d75)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e1340a0 [FLINK-15409][table-planner-blink] Fix code generation in windowed join function (#10714) add f205d75 [FLINK-15290][streaming] Configure user defined function with global configuration No new revisions were added by this update. Summary of changes: .../flink/connectors/hive/HiveTableSource.java | 10 +- .../connectors/hive/read/HiveTableInputFormat.java | 18 ++-- .../flink/connectors/hive/HiveTableSourceTest.java | 103 + .../apache/flink/table/HiveVersionTestUtil.java| 1 + 4 files changed, 120 insertions(+), 12 deletions(-)
[flink] branch release-1.10 updated: [FLINK-14849][hive][doc] Fix documentation about Hive dependencies
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 7ada211 [FLINK-14849][hive][doc] Fix documentation about Hive dependencies 7ada211 is described below commit 7ada211863e02039b52088deb198611e8c762c47 Author: Rui Li AuthorDate: Wed Dec 25 10:54:46 2019 +0800 [FLINK-14849][hive][doc] Fix documentation about Hive dependencies closes #10681. --- docs/dev/table/hive/index.md| 220 --- docs/dev/table/hive/index.zh.md | 222 +--- 2 files changed, 279 insertions(+), 163 deletions(-) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index ccf0e1d..245bada 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -88,10 +88,12 @@ Please note Hive itself have different features available for different versions ### Dependencies -To integrate with Hive, users need some dependencies in your `/lib/` directory in Flink distribution +To integrate with Hive, you need to add some extra dependencies to the `/lib/` directory in Flink distribution to make the integration work in Table API program or SQL in SQL Client. +Alternatively, you can put these dependencies in a dedicated folder, and add them to classpath with the `-C` +or `-l` option for Table API program or SQL Client respectively. -We are using Hive 2.3.4 and 1.2.1 as examples here. +Please find the required dependencies for different Hive major versions below. @@ -100,24 +102,63 @@ We are using Hive 2.3.4 and 1.2.1 as examples here. /flink-{{ site.version }} /lib - flink-dist{{ site.scala_version_suffix }}-{{ site.version }}.jar - flink-table{{ site.scala_version_suffix }}-{{ site.version }}.jar - // we highly recommend using Flink's blink planner with Hive integration - flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar // Hadoop dependencies - // Pick the correct Hadoop dependency for your project. - // Hive 2.3.4 is built with Hadoop 2.7.2. We pick 2.7.5 which flink-shaded-hadoop is pre-built with, - // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.7.2 + // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively + // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop + // cluster and the Hive version you're using. flink-shaded-hadoop-2-uber-2.7.5-{{ site.shaded_version }}.jar // Hive dependencies hive-exec-2.3.4.jar - ... +{% endhighlight %} + + + +{% highlight txt %} +/flink-{{ site.version }} + /lib + + // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars + flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar + + // Hadoop dependencies + // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively + // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop + // cluster and the Hive version you're using. + flink-shaded-hadoop-2-uber-2.6.5-{{ site.shaded_version }}.jar + + // Hive dependencies + hive-metastore-1.0.0.jar + hive-exec-1.0.0.jar + libfb303-0.9.0.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately + +{% endhighlight %} + + + +{% highlight txt %} +/flink-{{ site.version }} + /lib + + // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars + flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar + + // Hadoop dependencies + // You can pick a pre-built Hadoop uber jar provided by Flink, alternatively + // you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop + // cluster and the Hive version you're using. + flink-shaded-hadoop-2-uber-2.6.5-{{ site.shaded_version }}.jar + + // Hive dependencies + hive-metastore-1.1.0.jar + hive-exec-1.1.0.jar + libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately + {% endhighlight %} @@ -125,74 +166,117 @@ We are using Hive 2.3.4 and 1.2.1 as examples here. {% highlight txt %} /flink-{{ site.version }} /lib - flink-dist{{ site.scala_version_suffix }}-{{ site.version }}.jar - flink-table{{ site.scala_version_suffix }}-{{ site.version }}.jar - // we highly reco
[flink] branch master updated (2e0ba5d -> 28b6221)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2e0ba5d [FLINK-15189][hive][doc] Add documentation for hive view add 28b6221 [FLINK-14849][hive][doc] Fix documentation about Hive dependencies No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 220 --- docs/dev/table/hive/index.zh.md | 222 +--- 2 files changed, 279 insertions(+), 163 deletions(-)
[flink] branch release-1.9 updated (85e0f15 -> baeb9b3)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from 85e0f15 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields add baeb9b3 [FLINK-15412][hive] LocalExecutorITCase#testParameterizedTypes failed in travis No new revisions were added by this update. Summary of changes: .../table/client/gateway/local/DependencyTest.java | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15189][hive][doc] Add documentation for hive view
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new a87bc8f [FLINK-15189][hive][doc] Add documentation for hive view a87bc8f is described below commit a87bc8ffda040d7bbcd489f0474abeb44cdd77df Author: Rui Li AuthorDate: Thu Dec 26 20:36:21 2019 +0800 [FLINK-15189][hive][doc] Add documentation for hive view this closes #10701. --- docs/dev/table/hive/read_write_hive.md| 7 +++ docs/dev/table/hive/read_write_hive.zh.md | 7 +++ 2 files changed, 14 insertions(+) diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/read_write_hive.md index e44c2ac..f601a22 100644 --- a/docs/dev/table/hive/read_write_hive.md +++ b/docs/dev/table/hive/read_write_hive.md @@ -110,6 +110,13 @@ __ __ {% endhighlight %} +### Querying Hive views + +If you need to query Hive views, please note: + +1. You have to use the Hive catalog as your current catalog before you can query views in that catalog. It can be done by either `tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client. +2. Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view's query is compatible with Flink grammar. + ## Writing To Hive Similarly, data can be written into hive using an `INSERT` clause. diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/read_write_hive.zh.md index e44c2ac..f601a22 100644 --- a/docs/dev/table/hive/read_write_hive.zh.md +++ b/docs/dev/table/hive/read_write_hive.zh.md @@ -110,6 +110,13 @@ __ __ {% endhighlight %} +### Querying Hive views + +If you need to query Hive views, please note: + +1. You have to use the Hive catalog as your current catalog before you can query views in that catalog. It can be done by either `tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client. +2. Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view's query is compatible with Flink grammar. + ## Writing To Hive Similarly, data can be written into hive using an `INSERT` clause.
[flink] branch master updated (52fdee1 -> 2e0ba5d)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 52fdee1 [FLINK-15239][table-planner-blink] Fix CompileUtils:#COMPILED_CACHE leaks class loaders (#10620) add 2e0ba5d [FLINK-15189][hive][doc] Add documentation for hive view No new revisions were added by this update. Summary of changes: docs/dev/table/hive/read_write_hive.md| 7 +++ docs/dev/table/hive/read_write_hive.zh.md | 7 +++ 2 files changed, 14 insertions(+)
[flink] branch release-1.10 updated (de45fdb -> 694c588)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from de45fdb [hotfix][e2e] Disable Streaming File Sink s3 e2e test until FLINK-15355 has been fixed add 694c588 [FLINK-15391][hive] DATE and TIMESTAMP partition columns don't work No new revisions were added by this update. Summary of changes: .../connectors/hive/HivePartitionComputer.java | 68 ++ .../flink/connectors/hive/HiveTableSink.java | 5 +- .../flink/connectors/hive/HiveTableSource.java | 17 +- .../flink/table/catalog/hive/HiveCatalog.java | 2 +- .../table/catalog/hive/util/HiveTableUtil.java | 26 +++-- .../hive/conversion/HiveObjectConversion.java | 4 +- .../hive/conversion/IdentityConversion.java| 2 + .../conversion/WritableHiveObjectConversion.java | 2 + .../connectors/hive/TableEnvHiveConnectorTest.java | 28 + .../table/catalog/hive/util/HiveTableUtilTest.java | 11 +++- .../flink/orc/vector/AbstractOrcColumnVector.java | 4 ++ .../table/filesystem/RowPartitionComputer.java | 6 +- 12 files changed, 160 insertions(+), 15 deletions(-) create mode 100644 flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HivePartitionComputer.java
[flink] branch master updated (d3f1f06 -> 2e9a7f3)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d3f1f06 [FLINK-15288][kubernetes] Starting jobmanager pod should respect containerized heap-cutoff add 2e9a7f3 [FLINK-15391][hive] DATE and TIMESTAMP partition columns don't work No new revisions were added by this update. Summary of changes: .../connectors/hive/HivePartitionComputer.java | 68 ++ .../flink/connectors/hive/HiveTableSink.java | 5 +- .../flink/connectors/hive/HiveTableSource.java | 17 +- .../flink/table/catalog/hive/HiveCatalog.java | 2 +- .../table/catalog/hive/util/HiveTableUtil.java | 26 +++-- .../hive/conversion/HiveObjectConversion.java | 4 +- .../hive/conversion/IdentityConversion.java| 2 + .../conversion/WritableHiveObjectConversion.java | 2 + .../connectors/hive/TableEnvHiveConnectorTest.java | 28 + .../table/catalog/hive/util/HiveTableUtilTest.java | 11 +++- .../flink/orc/vector/AbstractOrcColumnVector.java | 4 ++ .../table/filesystem/RowPartitionComputer.java | 6 +- 12 files changed, 160 insertions(+), 15 deletions(-) create mode 100644 flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HivePartitionComputer.java
[flink] branch release-1.10 updated: [FLINK-15342][hive] add IT for querying Hive view
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new e999c6a [FLINK-15342][hive] add IT for querying Hive view e999c6a is described below commit e999c6ad80b9f0c1e289db1a3fe3fc36b6a6e9ed Author: Rui Li AuthorDate: Fri Dec 20 15:58:07 2019 +0800 [FLINK-15342][hive] add IT for querying Hive view this closes #10643. --- .../connectors/hive/TableEnvHiveConnectorTest.java | 35 ++ 1 file changed, 35 insertions(+) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index 7b5bac7..8eb6bf0 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -448,6 +448,41 @@ public class TableEnvHiveConnectorTest { } } + @Test + public void testViews() throws Exception { + hiveShell.execute("create database db1"); + try { + hiveShell.execute("create table db1.src (key int,val string)"); + HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") + .addRow(new Object[]{1, "a"}) + .addRow(new Object[]{1, "aa"}) + .addRow(new Object[]{1, "aaa"}) + .addRow(new Object[]{2, "b"}) + .addRow(new Object[]{3, "c"}) + .addRow(new Object[]{3, "ccc"}) + .commit(); + hiveShell.execute("create table db1.keys (key int,name string)"); + HiveTestUtils.createTextTableInserter(hiveShell, "db1", "keys") + .addRow(new Object[]{1, "key1"}) + .addRow(new Object[]{2, "key2"}) + .addRow(new Object[]{3, "key3"}) + .addRow(new Object[]{4, "key4"}) + .commit(); + hiveShell.execute("create view db1.v1 as select key as k,val as v from db1.src limit 2"); + hiveShell.execute("create view db1.v2 as select key,count(*) from db1.src group by key having count(*)>1 order by key"); + hiveShell.execute("create view db1.v3 as select k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by k.key,k.name order by k.key"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + List results = HiveTestUtils.collectTable(tableEnv, tableEnv.sqlQuery("select count(v) from db1.v1")); + assertEquals("[2]", results.toString()); + results = HiveTestUtils.collectTable(tableEnv, tableEnv.sqlQuery("select * from db1.v2")); + assertEquals("[1,3, 3,2]", results.toString()); + results = HiveTestUtils.collectTable(tableEnv, tableEnv.sqlQuery("select * from db1.v3")); + assertEquals("[1,key1,3, 2,key2,1, 3,key3,2]", results.toString()); + } finally { + hiveShell.execute("drop database db1 cascade"); + } + } + private TableEnvironment getTableEnvWithHiveCatalog() { TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
[flink] branch master updated (c685ee9 -> b3ff6ae)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c685ee9 [hotfix][doc] add section for various optimizations of reading Hive tables add b3ff6ae [FLINK-15342][hive] add IT for querying Hive view No new revisions were added by this update. Summary of changes: .../connectors/hive/TableEnvHiveConnectorTest.java | 35 ++ 1 file changed, 35 insertions(+)
[flink] branch master updated (16b6357 -> c685ee9)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 16b6357 [hotfix] Update task heap/offheap config option descriptions for preciseness. add c685ee9 [hotfix][doc] add section for various optimizations of reading Hive tables No new revisions were added by this update. Summary of changes: docs/dev/table/hive/read_write_hive.md| 26 +- docs/dev/table/hive/read_write_hive.zh.md | 26 +- 2 files changed, 50 insertions(+), 2 deletions(-)
[flink] branch release-1.10 updated: [hotfix][doc] add section for various optimizations of reading Hive tables
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new bd25a9f [hotfix][doc] add section for various optimizations of reading Hive tables bd25a9f is described below commit bd25a9ff935b4bdf0bd842c04255d9b926f16111 Author: bowen.li AuthorDate: Thu Dec 19 21:09:00 2019 -0800 [hotfix][doc] add section for various optimizations of reading Hive tables this closes #10640. --- docs/dev/table/hive/read_write_hive.md| 26 +- docs/dev/table/hive/read_write_hive.zh.md | 26 +- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/read_write_hive.md index f0ffa22..e44c2ac 100644 --- a/docs/dev/table/hive/read_write_hive.md +++ b/docs/dev/table/hive/read_write_hive.md @@ -137,11 +137,34 @@ Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08' Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08'; {% endhighlight %} + ## Formats We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet. -# -- ORC Vectorized Optimization -- + +## Optimizations + +### Partition Pruning + +Flink uses partition pruning as a performance optimization to limits the number of files and partitions +that Flink reads when querying Hive tables. When your data is partitioned, Flink only reads a subset of the partitions in +a Hive table when a query matches certain filter criteria. + +### Projection Pushdown + +Flink leverages projection pushdown to minimize data transfer between Flink and Hive tables by omitting +unnecessary fields from table scans. + +It is especially beneficial when a table contains many columns. + +### Limit Pushdown + +For queries with LIMIT clause, Flink will limit the number of output records wherever possible to minimize the +amount of data transferred across network. + +### ORC Vectorized Optimization upon Read + Optimization is used automatically when the following conditions are met: - Columns without complex data type, like hive types: List, Map, Struct, Union. @@ -153,6 +176,7 @@ This feature is turned on by default. If there is a problem, you can use this co table.exec.hive.fallback-mapred-reader=true {% endhighlight %} + ## Roadmap We are planning and actively working on supporting features like diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/read_write_hive.zh.md index f0ffa22..e44c2ac 100644 --- a/docs/dev/table/hive/read_write_hive.zh.md +++ b/docs/dev/table/hive/read_write_hive.zh.md @@ -137,11 +137,34 @@ Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08' Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08'; {% endhighlight %} + ## Formats We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet. -# -- ORC Vectorized Optimization -- + +## Optimizations + +### Partition Pruning + +Flink uses partition pruning as a performance optimization to limits the number of files and partitions +that Flink reads when querying Hive tables. When your data is partitioned, Flink only reads a subset of the partitions in +a Hive table when a query matches certain filter criteria. + +### Projection Pushdown + +Flink leverages projection pushdown to minimize data transfer between Flink and Hive tables by omitting +unnecessary fields from table scans. + +It is especially beneficial when a table contains many columns. + +### Limit Pushdown + +For queries with LIMIT clause, Flink will limit the number of output records wherever possible to minimize the +amount of data transferred across network. + +### ORC Vectorized Optimization upon Read + Optimization is used automatically when the following conditions are met: - Columns without complex data type, like hive types: List, Map, Struct, Union. @@ -153,6 +176,7 @@ This feature is turned on by default. If there is a problem, you can use this co table.exec.hive.fallback-mapred-reader=true {% endhighlight %} + ## Roadmap We are planning and actively working on supporting features like
[flink] branch release-1.10 updated: [FLINK-15344][documentation] Update limitations in hive udf document
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 324537a [FLINK-15344][documentation] Update limitations in hive udf document 324537a is described below commit 324537a107ffb8f3c9ceed4b6d8a3a3bdb3c5711 Author: JingsongLi AuthorDate: Fri Dec 20 16:38:36 2019 +0800 [FLINK-15344][documentation] Update limitations in hive udf document this closes #10647. --- docs/dev/table/hive/hive_functions.md| 94 ++-- docs/dev/table/hive/hive_functions.zh.md | 94 ++-- 2 files changed, 82 insertions(+), 106 deletions(-) diff --git a/docs/dev/table/hive/hive_functions.md b/docs/dev/table/hive/hive_functions.md index b0e4b2a..e2b7ac5 100644 --- a/docs/dev/table/hive/hive_functions.md +++ b/docs/dev/table/hive/hive_functions.md @@ -1,5 +1,5 @@ --- -title: "Hive UDF and Built-in Functions" +title: "Hive functions" nav-parent_id: hive_tableapi nav-pos: 3 --- @@ -22,6 +22,46 @@ specific language governing permissions and limitations under the License. --> +## Use Hive Built-in Functions via HiveModule + +The `HiveModule` provides Hive built-in functions as Flink system (built-in) functions to Flink SQL and Table API users. + +For detailed information, please refer to [HiveModule]({{ site.baseurl }}/dev/table/modules.html#hivemodule). + + + +{% highlight java %} + +String name= "myhive"; +String version = "2.3.4"; + +tableEnv.loadModue(name, new HiveModule(version)); +{% endhighlight %} + + +{% highlight scala %} + +val name= "myhive" +val version = "2.3.4" + +tableEnv.loadModue(name, new HiveModule(version)); +{% endhighlight %} + + +{% highlight yaml %} +modules: + - name: core + type: core + - name: myhive + type: hive + hive-version: 2.3.4 +{% endhighlight %} + + + +* NOTE that some Hive built-in functions in older versions have [thread safety issues](https://issues.apache.org/jira/browse/HIVE-16183). +We recommend users patch their own Hive to fix them. + ## Hive User Defined Functions Users can use their existing Hive User Defined Functions in Flink. @@ -154,55 +194,3 @@ Then, users can use them in SQL as: Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s); {% endhighlight %} - - -### Limitations - -Hive built-in functions are currently not supported out of box in Flink. To use Hive built-in functions, users must register them manually in Hive Metastore first. - -Support for Hive functions has only been tested for Flink batch in Blink planner. - -Hive functions currently cannot be used across catalogs in Flink. - -Please reference to [Hive]({{ site.baseurl }}/dev/table/hive/index.html) for data type limitations. - - -## Use Hive Built-in Functions via HiveModule - -The `HiveModule` provides Hive built-in functions as Flink system (built-in) functions to Flink SQL and Table API users. - -For detailed information, please refer to [HiveModule]({{ site.baseurl }}/dev/table/modules.html#hivemodule). - - - -{% highlight java %} - -String name= "myhive"; -String version = "2.3.4"; - -tableEnv.loadModue(name, new HiveModule(version)); -{% endhighlight %} - - -{% highlight scala %} - -val name= "myhive" -val version = "2.3.4" - -tableEnv.loadModue(name, new HiveModule(version)); -{% endhighlight %} - - -{% highlight yaml %} -modules: - - name: core - type: core - - name: myhive - type: hive - hive-version: 2.3.4 -{% endhighlight %} - - - -* NOTE that some Hive built-in functions in older versions have [thread safety issues](https://issues.apache.org/jira/browse/HIVE-16183). -We recommend users patch their own Hive to fix them. diff --git a/docs/dev/table/hive/hive_functions.zh.md b/docs/dev/table/hive/hive_functions.zh.md index b0e4b2a..e2b7ac5 100644 --- a/docs/dev/table/hive/hive_functions.zh.md +++ b/docs/dev/table/hive/hive_functions.zh.md @@ -1,5 +1,5 @@ --- -title: "Hive UDF and Built-in Functions" +title: "Hive functions" nav-parent_id: hive_tableapi nav-pos: 3 --- @@ -22,6 +22,46 @@ specific language governing permissions and limitations under the License. --> +## Use Hive Built-in Functions via HiveModule + +The `HiveModule` provides Hive built-in functions as Flink system (built-in) functions to Flink SQL and Table API users. + +For detailed information, please refer to [HiveModule]({{ site.baseurl }}/dev/table/modules.html#hivemodule). + + + +{% highlight java %} + +String name= "myhive"; +String version = "2.3.4"
[flink] branch master updated (393b91f -> 5ec3fea)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 393b91f [FLINK-15243][table][docs] Fix documentation about how to set line feed as delimiter for csv format add 5ec3fea [FLINK-15344][documentation] Update limitations in hive udf document No new revisions were added by this update. Summary of changes: docs/dev/table/hive/hive_functions.md| 94 ++-- docs/dev/table/hive/hive_functions.zh.md | 94 ++-- 2 files changed, 82 insertions(+), 106 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by introducing orc shim
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 37fbbd4 [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by introducing orc shim 37fbbd4 is described below commit 37fbbd4ba57d356b1877a9b7d600254550b21321 Author: JingsongLi AuthorDate: Fri Dec 20 14:42:33 2019 +0800 [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by introducing orc shim add shim to support hive orc version less than 2.3 this closes #10618. --- flink-connectors/flink-connector-hive/pom.xml | 13 ++ .../connectors/hive/read/HiveTableInputFormat.java | 2 +- .../hive/read/HiveVectorizedOrcSplitReader.java| 2 + flink-formats/flink-orc/pom.xml| 8 +- .../flink/orc/OrcColumnarRowSplitReader.java | 3 + .../org/apache/flink/orc/OrcRowSplitReader.java| 3 +- .../java/org/apache/flink/orc/OrcSplitReader.java | 127 +++-- .../org/apache/flink/orc/OrcSplitReaderUtil.java | 3 + .../java/org/apache/flink/orc/shim/OrcShim.java| 78 .../org/apache/flink/orc/shim/OrcShimV200.java | 200 + .../org/apache/flink/orc/shim/OrcShimV210.java | 37 .../org/apache/flink/orc/shim/OrcShimV230.java | 57 ++ .../flink/orc/OrcColumnarRowSplitReaderTest.java | 1 + .../apache/flink/orc/OrcRowInputFormatTest.java| 99 +- pom.xml| 1 + 15 files changed, 480 insertions(+), 154 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 7a7bfb0..16ec6ae 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -761,6 +761,19 @@ under the License. 2.2.0 + + + org.apache.orc + orc-core + ${orc.version} + + + org.apache.hadoop + hadoop-common + + + + hive-3.1.1 diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java index 8d28851..3e45fe3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java @@ -115,7 +115,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase org.apache.orc orc-core - 1.4.3 + ${orc.version} @@ -91,6 +91,12 @@ under the License. provided + + org.apache.commons + commons-lang3 + provided + + diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java index ffdbd05..bea6331 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java @@ -19,6 +19,7 @@ package org.apache.flink.orc; import org.apache.flink.core.fs.Path; +import org.apache.flink.orc.shim.OrcShim; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.dataformat.ColumnarRow; import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; @@ -41,6 +42,7 @@ public class OrcColumnarRowSplitReader extends OrcSplitReader { private final ColumnarRow row; public OrcColumnarRowSplitReader( + OrcShim shim, Configuration conf, TypeDescription schema, int[] selectedFields, @@ -51,6 +53,7 @@ public class
[flink] branch master updated (0f88c39 -> 790a077)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0f88c39 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields add 790a077 [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by introducing orc shim No new revisions were added by this update. Summary of changes: flink-connectors/flink-connector-hive/pom.xml | 13 ++ .../connectors/hive/read/HiveTableInputFormat.java | 2 +- .../hive/read/HiveVectorizedOrcSplitReader.java| 2 + flink-formats/flink-orc/pom.xml| 8 +- .../flink/orc/OrcColumnarRowSplitReader.java | 3 + .../org/apache/flink/orc/OrcRowSplitReader.java| 3 +- .../java/org/apache/flink/orc/OrcSplitReader.java | 127 +++-- .../org/apache/flink/orc/OrcSplitReaderUtil.java | 3 + .../java/org/apache/flink/orc/shim/OrcShim.java| 78 .../org/apache/flink/orc/shim/OrcShimV200.java | 200 + .../org/apache/flink/orc/shim/OrcShimV210.java | 16 +- .../org/apache/flink/orc/shim/OrcShimV230.java | 57 ++ .../flink/orc/OrcColumnarRowSplitReaderTest.java | 1 + .../apache/flink/orc/OrcRowInputFormatTest.java| 99 +- pom.xml| 1 + 15 files changed, 452 insertions(+), 161 deletions(-) create mode 100644 flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java create mode 100644 flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java copy flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/Disconnected.java => flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java (68%) create mode 100644 flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java
[flink] branch release-1.9 updated: [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.9 by this push: new 85e0f15 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields 85e0f15 is described below commit 85e0f1534eeb0fc5218007ee09a90d9b3f8bad0e Author: JingsongLi AuthorDate: Mon Dec 23 15:30:47 2019 +0800 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields fix the problem, when after projectFields, ParquetTableSource will loose predicates. this closes #10660. --- .../main/java/org/apache/flink/formats/parquet/ParquetTableSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java index 7438009..494b8a6 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java @@ -156,7 +156,7 @@ public class ParquetTableSource @Override public TableSource projectFields(int[] fields) { - return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, null); + return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, predicate); } @Override
[flink] branch release-1.10 updated: [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 627a472 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields 627a472 is described below commit 627a47289c350d6ef0e6cd00cf62f021f2ead614 Author: JingsongLi AuthorDate: Mon Dec 23 15:30:47 2019 +0800 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields fix the problem, when after projectFields, ParquetTableSource will loose predicates. this closes #10660. --- .../main/java/org/apache/flink/formats/parquet/ParquetTableSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java index a8d88fd..a8b90f4 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java @@ -156,7 +156,7 @@ public class ParquetTableSource @Override public TableSource projectFields(int[] fields) { - return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, null); + return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, predicate); } @Override
[flink] branch master updated (dcf4869 -> 0f88c39)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from dcf4869 [FLINK-15272][table-planner-blink] Improve exception message when insert partition with values (#10591) add 0f88c39 [FLINK-15361][parquet] ParquetTableSource should pass predicate in projectFields No new revisions were added by this update. Summary of changes: .../main/java/org/apache/flink/formats/parquet/ParquetTableSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch release-1.10 updated: [FLINK-14796][hive][doc] Add document about limitations of different Hive versions
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 38cfb98 [FLINK-14796][hive][doc] Add document about limitations of different Hive versions 38cfb98 is described below commit 38cfb98c01d26d902ec96b51f04de8a85112e1ed Author: bowen.li AuthorDate: Fri Dec 20 11:32:20 2019 -0800 [FLINK-14796][hive][doc] Add document about limitations of different Hive versions this closes #10650. --- docs/dev/table/hive/index.md| 8 docs/dev/table/hive/index.zh.md | 8 2 files changed, 16 insertions(+) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index ff4dac6..ccf0e1d 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -78,6 +78,14 @@ Flink supports the following Hive versions. - 3.1.1 - 3.1.2 +Please note Hive itself have different features available for different versions, and these issues are not caused by Flink: + +- Hive built-in functions are supported in 1.2.0 and later. +- Column constraints, i.e. PRIMARY KEY and NOT NULL, are supported in 3.1.0 and later. +- Altering table statistics is supported in 1.2.0 and later. +- `DATE` column statistics are supported in 1.2.0 and later. +- Writing to ORC tables is not supported in 2.0.x. + ### Dependencies To integrate with Hive, users need some dependencies in your `/lib/` directory in Flink distribution diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 99dce07..afd9da7 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -78,6 +78,14 @@ Flink supports the following Hive versions. - 3.1.1 - 3.1.2 +Please note Hive itself have different features available for different versions, and these issues are not caused by Flink: + +- Hive built-in functions are supported in 1.2.0 and later. +- Column constraints, i.e. PRIMARY KEY and NOT NULL, are supported in 3.1.0 and later. +- Altering table statistics is supported in 1.2.0 and later. +- `DATE` column statistics are supported in 1.2.0 and later. +- Writing to ORC tables is not supported in 2.0.x. + ### Dependencies To integrate with Hive, users need some dependencies in your `/lib/` directory in Flink distribution
[flink] branch master updated (c84f206 -> 0468708)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c84f206 [FLINK-15322][hive] Parquet test fails with Hive versions prior to 1.2.0 add 0468708 [FLINK-14796][hive][doc] Add document about limitations of different Hive versions No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 8 docs/dev/table/hive/index.zh.md | 8 2 files changed, 16 insertions(+)
[flink] branch release-1.10 updated: [FLINK-15322][hive] Parquet test fails with Hive versions prior to 1.2.0
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new f06fd92 [FLINK-15322][hive] Parquet test fails with Hive versions prior to 1.2.0 f06fd92 is described below commit f06fd92c29b16a1ea1eccfd0645115cd7e8093d4 Author: Rui Li AuthorDate: Thu Dec 19 16:58:39 2019 +0800 [FLINK-15322][hive] Parquet test fails with Hive versions prior to 1.2.0 this closes #10626. --- .../connectors/hive/TableEnvHiveConnectorTest.java | 35 ++ 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index 9fccea5..7b5bac7 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -48,12 +48,14 @@ import java.sql.Date; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -144,13 +146,24 @@ public class TableEnvHiveConnectorTest { } else { suffix = "stored as " + format; } - hiveShell.execute("create table db1.src (i int,s string,ts timestamp,dt date) " + suffix); - hiveShell.execute("create table db1.dest (i int,s string,ts timestamp,dt date) " + suffix); + String tableSchema; + List row1 = new ArrayList<>(Arrays.asList(1, "a", "2018-08-20 00:00:00.1")); + List row2 = new ArrayList<>(Arrays.asList(2, "b", "2019-08-26 00:00:00.1")); + // some data types are not supported for parquet tables in early versions -- https://issues.apache.org/jira/browse/HIVE-6384 + if (HiveVersionTestUtil.HIVE_120_OR_LATER || !format.equals("parquet")) { + tableSchema = "(i int,s string,ts timestamp,dt date)"; + row1.add("2018-08-20"); + row2.add("2019-08-26"); + } else { + tableSchema = "(i int,s string,ts timestamp)"; + } + hiveShell.execute(String.format("create table db1.src %s %s", tableSchema, suffix)); + hiveShell.execute(String.format("create table db1.dest %s %s", tableSchema, suffix)); // prepare source data with Hive // TABLE keyword in INSERT INTO is mandatory prior to 1.1.0 - hiveShell.execute("insert into table db1.src values " + - "(1,'a','2018-08-20 00:00:00.1','2018-08-20'),(2,'b','2019-08-26 00:00:00.1','2019-08-26')"); + hiveShell.execute(String.format("insert into table db1.src values (%s),(%s)", + toRowValue(row1), toRowValue(row2))); // populate dest table with source table tableEnv.sqlUpdate("insert into db1.dest select * from db1.src"); @@ -158,11 +171,23 @@ public class TableEnvHiveConnectorTest { // verify data on hive side verifyHiveQueryResult("select * from db1.dest", - Arrays.asList("1\ta\t2018-08-20 00:00:00.1\t2018-08-20", "2\tb\t2019-08-26 00:00:00.1\t2019-08-26")); + Arrays.asList( + row1.stream().map(Object::toString).collect(Collectors.joining("\t")), + row2.stream().map(Object::toString).collect(Collectors.joining("\t"; hiveShell.execute("drop database db1 cascade"); } + private String toRowValue(List row) { + return row.stream().map(o -> { + String res = o.toString(); + if (o instanceof String) { + res = "'" + res + "'"; + } + return res; + }).collect(Collectors.joining(",")); + } + @Test public void testDecimal() throws Exception { hiveShell.execute("create database db1");
[flink] branch master updated (a0ceb6f -> c84f206)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a0ceb6f [hotfix][doc][hive] add information of hive connector jar add c84f206 [FLINK-15322][hive] Parquet test fails with Hive versions prior to 1.2.0 No new revisions were added by this update. Summary of changes: .../connectors/hive/TableEnvHiveConnectorTest.java | 35 ++ 1 file changed, 30 insertions(+), 5 deletions(-)
[flink] branch release-1.10 updated: [hotfix][doc][hive] add information of hive connector jar
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 3da25e1 [hotfix][doc][hive] add information of hive connector jar 3da25e1 is described below commit 3da25e1b51d609a4e6c35a3c15088c9795e52185 Author: bowen.li AuthorDate: Fri Dec 20 11:04:16 2019 -0800 [hotfix][doc][hive] add information of hive connector jar --- docs/dev/table/hive/index.md| 6 ++ docs/dev/table/hive/index.zh.md | 6 ++ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index 4c8f093..ff4dac6 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -97,14 +97,13 @@ We are using Hive 2.3.4 and 1.2.1 as examples here. // we highly recommend using Flink's blink planner with Hive integration flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar - // Flink's Hive connector + // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar // Hadoop dependencies // Pick the correct Hadoop dependency for your project. // Hive 2.3.4 is built with Hadoop 2.7.2. We pick 2.7.5 which flink-shaded-hadoop is pre-built with, // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.7.2 - flink-hadoop-compatibility{{ site.scala_version_suffix }}-{{ site.version }}.jar flink-shaded-hadoop-2-uber-2.7.5-{{ site.shaded_version }}.jar // Hive dependencies @@ -123,14 +122,13 @@ We are using Hive 2.3.4 and 1.2.1 as examples here. // we highly recommend using Flink's blink planner with Hive integration flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar - // Flink's Hive connector + // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar // Hadoop dependencies // Pick the correct Hadoop dependency for your project. // Hive 1.2.1 is built with Hadoop 2.6.0. We pick 2.6.5 which flink-shaded-hadoop is pre-built with, // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.6.0 - flink-hadoop-compatibility{{ site.scala_version_suffix }}-{{ site.version }}.jar flink-shaded-hadoop-2-uber-2.6.5-{{ site.shaded_version }}.jar // Hive dependencies diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 4c8f093..99dce07 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -97,14 +97,13 @@ We are using Hive 2.3.4 and 1.2.1 as examples here. // we highly recommend using Flink's blink planner with Hive integration flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar - // Flink's Hive connector + // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar // Hadoop dependencies // Pick the correct Hadoop dependency for your project. // Hive 2.3.4 is built with Hadoop 2.7.2. We pick 2.7.5 which flink-shaded-hadoop is pre-built with, // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.7.2 - flink-hadoop-compatibility{{ site.scala_version_suffix }}-{{ site.version }}.jar flink-shaded-hadoop-2-uber-2.7.5-{{ site.shaded_version }}.jar // Hive dependencies @@ -123,14 +122,13 @@ We are using Hive 2.3.4 and 1.2.1 as examples here. // we highly recommend using Flink's blink planner with Hive integration flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar - // Flink's Hive connector + // Flink's Hive connector. Contains flink-hadoop-compatibility and flink-orc jars flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar // Hadoop dependencies // Pick the correct Hadoop dependency for your project. // Hive 1.2.1 is built with Hadoop 2.6.0. We pick 2.6.5 which flink-shaded-hadoop is pre-built with, // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.6.0 - flink-hadoop-compatibility{{ site.scala_version_suffix }}-{{ site.version }}.jar flink-shaded-hadoop-2-uber-2.6.5-{{ site.shaded_version }}.jar // Hive dependencies
[flink] branch master updated (57be6ed -> a0ceb6f)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 57be6ed [FLINK-15287][hive] Pack flink-hadoop-compatibility and flink-orc into flink-hive add a0ceb6f [hotfix][doc][hive] add information of hive connector jar No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 6 ++ docs/dev/table/hive/index.zh.md | 6 ++ 2 files changed, 4 insertions(+), 8 deletions(-)
[flink] branch release-1.9 updated (0f701a0 -> 4e07b8c)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git. from 0f701a0 [hotfix] Set initial bestCandidateScore in LocationPreferenceSlotSelectionStrategy to Double.NEGATIVE_INFINITY new cca2387 [FLINK-15234][hive] hive table created from flink catalog table cannot have null properties in parameters new 4e07b8c [FLINK-15240][hive] is_generic key is missing for Flink table stored in HiveCatalog The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/table/catalog/hive/HiveCatalog.java | 47 +++-- .../flink/connectors/hive/HiveTableSinkTest.java | 22 -- .../hive/HiveCatalogGenericMetadataTest.java | 2 +- .../catalog/hive/HiveCatalogHiveMetadataTest.java | 2 +- .../table/catalog/hive/HiveCatalogITCase.java | 50 -- ...tBase.java => HiveCatalogMetadataTestBase.java} | 2 +- .../flink/table/catalog/hive/HiveCatalogTest.java | 80 ++ .../flink/table/catalog/AbstractCatalogTable.java | 6 ++ 8 files changed, 192 insertions(+), 19 deletions(-) rename flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/{HiveCatalogTestBase.java => HiveCatalogMetadataTestBase.java} (95%) create mode 100644 flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
[flink] 02/02: [FLINK-15240][hive] is_generic key is missing for Flink table stored in HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git commit 4e07b8c371a9130a9316ff9d4a731caac06c8df9 Author: bowen.li AuthorDate: Tue Dec 17 15:31:07 2019 -0800 [FLINK-15240][hive] is_generic key is missing for Flink table stored in HiveCatalog backport from 1.10 --- .../flink/table/catalog/hive/HiveCatalog.java | 40 ++--- .../flink/connectors/hive/HiveTableSinkTest.java | 22 -- .../hive/HiveCatalogGenericMetadataTest.java | 2 +- .../catalog/hive/HiveCatalogHiveMetadataTest.java | 2 +- .../table/catalog/hive/HiveCatalogITCase.java | 50 +++--- ...tBase.java => HiveCatalogMetadataTestBase.java} | 2 +- 6 files changed, 101 insertions(+), 17 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index dcfb313..2713923 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -500,10 +500,21 @@ public class HiveCatalog extends AbstractCatalog { // Table properties Map properties = hiveTable.getParameters(); - boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); - if (isGeneric) { - properties = retrieveFlinkProperties(properties); + // When retrieving a table, a generic table needs explicitly have a key is_generic = true + // otherwise, this is a Hive table if 1) the key is missing 2) is_generic = false + // this is opposite to creating a table. See instantiateHiveTable() + + if (!properties.containsKey(CatalogConfig.IS_GENERIC)) { + // must be a hive table + properties.put(CatalogConfig.IS_GENERIC, String.valueOf(false)); + } else { + boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); + + if (isGeneric) { + properties = retrieveFlinkProperties(properties); + } } + String comment = properties.remove(HiveCatalogConfig.COMMENT); // Table schema @@ -555,11 +566,24 @@ public class HiveCatalog extends AbstractCatalog { properties.put(HiveCatalogConfig.COMMENT, table.getComment()); } - boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); + // Make sure there's no null in properties + properties = cleanNullProperties(properties); - if (isGeneric) { + // When creating a table, A hive table needs explicitly have a key is_generic = false + // otherwise, this is a generic table if 1) the key is missing 2) is_generic = true + // this is opposite to reading a table and instantiating a CatalogTable. See instantiateCatalogTable() + if (!properties.containsKey(CatalogConfig.IS_GENERIC)) { + // must be a generic catalog + properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true)); properties = maskFlinkProperties(properties); + } else { + boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); + + if (isGeneric) { + properties = maskFlinkProperties(properties); + } } + // Table properties hiveTable.setParameters(properties); @@ -602,6 +626,12 @@ public class HiveCatalog extends AbstractCatalog { return hiveTable; } + private static Map cleanNullProperties(Map properties) { + return properties.entrySet().stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + private static void setStorageFormat(StorageDescriptor sd, Map properties) { // TODO: allow user to specify storage format. Simply use text format for now String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT; diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.ja
[flink] 01/02: [FLINK-15234][hive] hive table created from flink catalog table cannot have null properties in parameters
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git commit cca2387a48e42950f0e53f8d9f002a4e0acf0c94 Author: bowen AuthorDate: Tue Dec 17 15:10:46 2019 -0800 [FLINK-15234][hive] hive table created from flink catalog table cannot have null properties in parameters backport from 1.10 --- .../flink/table/catalog/hive/HiveCatalog.java | 7 +- .../flink/table/catalog/hive/HiveCatalogTest.java | 80 ++ .../flink/table/catalog/AbstractCatalogTable.java | 6 ++ 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index dd50ce2..dcfb313 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -542,7 +542,8 @@ public class HiveCatalog extends AbstractCatalog { } } - private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + @VisibleForTesting + protected static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) { // let Hive set default parameters for us, e.g. serialization.format Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(), tablePath.getObjectName()); @@ -550,7 +551,9 @@ public class HiveCatalog extends AbstractCatalog { Map properties = new HashMap<>(table.getProperties()); // Table comment - properties.put(HiveCatalogConfig.COMMENT, table.getComment()); + if (table.getComment() != null) { + properties.put(HiveCatalogConfig.COMMENT, table.getComment()); + } boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java new file mode 100644 index 000..23ccb45 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.config.CatalogConfig; +import org.apache.flink.table.descriptors.FileSystem; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for HiveCatalog. + */ +public class HiveCatalogTest { + + TableSchema schema = TableSchema.builder() + .field("name", DataTypes.STRING()) + .field("age", DataTypes.INT()) + .build(); + + @Test + public void testCreateGenericTable() { + Table hiveTable = HiveCatalog.instantiateHiveTable( + new ObjectPath("test", "test"), + new CatalogTableImpl( + schema, + new FileSystem().path("/test_path").toProperties(), + null + )); + + Map prop = hiveTable.getParameters(); + assertEquals(prop.remove(CatalogConfig.IS_GENERIC), String.valueOf("true&
[flink] branch release-1.10 updated: [FLINK-15205][hive][document] Update hive partition and orc for read_write_hive
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 58eb541 [FLINK-15205][hive][document] Update hive partition and orc for read_write_hive 58eb541 is described below commit 58eb541dd9cdf48aefe7ef2862915b06e15e10b2 Author: JingsongLi AuthorDate: Mon Dec 16 15:58:02 2019 +0800 [FLINK-15205][hive][document] Update hive partition and orc for read_write_hive this closes #10590. --- docs/dev/table/hive/read_write_hive.md| 37 --- docs/dev/table/hive/read_write_hive.zh.md | 37 --- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/read_write_hive.md index daa62a3..f0ffa22 100644 --- a/docs/dev/table/hive/read_write_hive.md +++ b/docs/dev/table/hive/read_write_hive.md @@ -91,7 +91,7 @@ root |-- name: value |-- type: DOUBLE - +# -- Select from hive table or hive view -- Flink SQL> SELECT * FROM mytable; name value @@ -112,16 +112,47 @@ __ __ ## Writing To Hive -Similarly, data can be written into hive using an `INSERT INTO` clause. +Similarly, data can be written into hive using an `INSERT` clause. + +Consider there is an example table named "mytable" with two columns: name and age, in string and int type. {% highlight bash %} -Flink SQL> INSERT INTO mytable (name, value) VALUES ('Tom', 4.72); +# -- INSERT INTO will append to the table or partition, keeping the existing data intact -- +Flink SQL> INSERT INTO mytable SELECT 'Tom', 25; + +# -- INSERT OVERWRITE will overwrite any existing data in the table or partition -- +Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25; +{% endhighlight %} + +We support partitioned table too, Consider there is a partitioned table named myparttable with four columns: name, age, my_type and my_date, in types .. my_type and my_date are the partition keys. + +{% highlight bash %} +# -- Insert with static partition -- +Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25; + +# -- Insert with dynamic partition -- +Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08'; + +# -- Insert with static(my_type) and dynamic(my_date) partition -- +Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08'; {% endhighlight %} ## Formats We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet. +# -- ORC Vectorized Optimization -- +Optimization is used automatically when the following conditions are met: + +- Columns without complex data type, like hive types: List, Map, Struct, Union. +- Hive version greater than or equal to version 2.0.0. + +This feature is turned on by default. If there is a problem, you can use this config option to close ORC Vectorized Optimization: + +{% highlight bash %} +table.exec.hive.fallback-mapred-reader=true +{% endhighlight %} + ## Roadmap We are planning and actively working on supporting features like diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/read_write_hive.zh.md index daa62a3..f0ffa22 100644 --- a/docs/dev/table/hive/read_write_hive.zh.md +++ b/docs/dev/table/hive/read_write_hive.zh.md @@ -91,7 +91,7 @@ root |-- name: value |-- type: DOUBLE - +# -- Select from hive table or hive view -- Flink SQL> SELECT * FROM mytable; name value @@ -112,16 +112,47 @@ __ __ ## Writing To Hive -Similarly, data can be written into hive using an `INSERT INTO` clause. +Similarly, data can be written into hive using an `INSERT` clause. + +Consider there is an example table named "mytable" with two columns: name and age, in string and int type. {% highlight bash %} -Flink SQL> INSERT INTO mytable (name, value) VALUES ('Tom', 4.72); +# -- INSERT INTO will append to the table or partition, keeping the existing data intact -- +Flink SQL> INSERT INTO mytable SELECT 'Tom', 25; + +# -- INSERT OVERWRITE will overwrite any existing data in the table or partition -- +Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25; +{% endhighlight %} + +We support partitioned table too, Consider there is a partitioned table named myparttable with four columns: name, age, my_type and my_date, in types .. my_type and my_date are the partition keys. + +{% highlight bash %} +# -- Insert with static partition -- +Flink SQL>
[flink] branch master updated (4e3b8cc -> 1384381)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 4e3b8cc [FLINK-15256][hive] HiveModuleFactory should take hive-version as required supported property add 1384381 [FLINK-15205][hive][document] Update hive partition and orc for read_write_hive No new revisions were added by this update. Summary of changes: docs/dev/table/hive/read_write_hive.md| 37 --- docs/dev/table/hive/read_write_hive.zh.md | 37 --- 2 files changed, 68 insertions(+), 6 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15256][hive] HiveModuleFactory should take hive-version as required supported property
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new aeb00ab [FLINK-15256][hive] HiveModuleFactory should take hive-version as required supported property aeb00ab is described below commit aeb00ab478d5d518bec6722378ae05e966c0ff8a Author: bowen.li AuthorDate: Fri Dec 13 14:15:48 2019 -0800 [FLINK-15256][hive] HiveModuleFactory should take hive-version as required supported property HiveModuleFactory should have hive-version as property. Currently it cannot pick up hive-version from service discovery this closes #10577. --- .../apache/flink/table/module/hive/HiveModule.java | 11 + .../table/module/hive/HiveModuleDescriptor.java| 5 +- .../flink/table/module/hive/HiveModuleFactory.java | 4 +- .../module/hive/HiveModuleDescriptorTest.java | 5 +- .../table/module/hive/HiveModuleFactoryTest.java | 55 ++ .../client/gateway/local/ExecutionContextTest.java | 6 ++- .../test/resources/test-sql-client-modules.yaml| 5 ++ 7 files changed, 82 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java index 048e416..588eec6 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java @@ -23,21 +23,28 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.module.Module; +import org.apache.flink.util.StringUtils; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import java.util.Optional; import java.util.Set; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Module to provide Hive built-in metadata. */ public class HiveModule implements Module { private final HiveFunctionDefinitionFactory factory; + private final String hiveVersion; private final HiveShim hiveShim; public HiveModule(String hiveVersion) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null"); + + this.hiveVersion = hiveVersion; this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); this.factory = new HiveFunctionDefinitionFactory(hiveShim); } @@ -55,4 +62,8 @@ public class HiveModule implements Module { Optional.of(factory.createFunctionDefinitionFromHiveFunction(name, info.get().getFunctionClass().getName())) : Optional.empty(); } + + public String getHiveVersion() { + return hiveVersion; + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java index 5391e70..0acd653 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleDescriptor.java @@ -34,14 +34,11 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class HiveModuleDescriptor extends ModuleDescriptor { private String hiveVersion; - public HiveModuleDescriptor() { + public HiveModuleDescriptor(String hiveVersion) { super(MODULE_TYPE_HIVE); - } - public HiveModuleDescriptor hiveVersion(String hiveVersion) { checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion)); this.hiveVersion = hiveVersion; - return this; } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java index 48dcd9f..aabdf54 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java @@ -23,7 +23,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.mod
[flink] branch master updated (974627d -> 4e3b8cc)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 974627d [FLINK-13373][docs] Merge bash and windows local setup and move to deployment add 4e3b8cc [FLINK-15256][hive] HiveModuleFactory should take hive-version as required supported property No new revisions were added by this update. Summary of changes: .../apache/flink/table/module/hive/HiveModule.java | 11 + .../table/module/hive/HiveModuleDescriptor.java| 5 +- .../flink/table/module/hive/HiveModuleFactory.java | 4 +- .../module/hive/HiveModuleDescriptorTest.java | 5 +- .../table/module/hive/HiveModuleFactoryTest.java | 55 ++ .../client/gateway/local/ExecutionContextTest.java | 6 ++- .../test/resources/test-sql-client-modules.yaml| 5 ++ 7 files changed, 82 insertions(+), 9 deletions(-) create mode 100644 flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleFactoryTest.java
[flink] branch release-1.10 updated: [FLINK-15254][sql cli][module] modules in SQL CLI yaml should preserve order
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 6dd510e [FLINK-15254][sql cli][module] modules in SQL CLI yaml should preserve order 6dd510e is described below commit 6dd510e6efb372f89f4b6c7cf82064d4fa21da0d Author: bowen.li AuthorDate: Fri Dec 13 16:04:16 2019 -0800 [FLINK-15254][sql cli][module] modules in SQL CLI yaml should preserve order currently the module map is a hash map in sql cli, which doesn't preserve module loading order from yaml. fix it by always using a linked hash map this closes #10578. --- .../flink/table/client/config/Environment.java | 6 ++--- .../client/gateway/local/EnvironmentTest.java | 28 ++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java index 506c23b..cb72b70 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java @@ -69,7 +69,7 @@ public class Environment { private DeploymentEntry deployment; public Environment() { - this.modules = Collections.emptyMap(); + this.modules = new LinkedHashMap<>(); this.catalogs = Collections.emptyMap(); this.tables = Collections.emptyMap(); this.functions = Collections.emptyMap(); @@ -83,7 +83,7 @@ public class Environment { } public void setModules(List> modules) { - this.modules = new HashMap<>(modules.size()); + this.modules = new LinkedHashMap<>(modules.size()); modules.forEach(config -> { final ModuleEntry entry = ModuleEntry.create(config); @@ -235,7 +235,7 @@ public class Environment { final Environment mergedEnv = new Environment(); // merge modules - final Map modules = new HashMap<>(env1.getModules()); + final Map modules = new LinkedHashMap<>(env1.getModules()); modules.putAll(env2.getModules()); mergedEnv.modules = modules; diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java index e4e2e89..49af642 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java @@ -26,6 +26,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -110,6 +111,33 @@ public class EnvironmentTest { createModule("module2", "test"))); } + @Test + public void testModuleOrder() { + Environment env1 = new Environment(); + Environment env2 = new Environment(); + env1.setModules(Arrays.asList( + createModule("b", "test"), + createModule("d", "test"))); + + env2.setModules(Arrays.asList( + createModule("c", "test"), + createModule("a", "test"))); + + assertEquals( + Arrays.asList("b", "d"), new ArrayList<>(env1.getModules().keySet()) + ); + + assertEquals( + Arrays.asList("c", "a"), new ArrayList<>(env2.getModules().keySet()) + ); + + Environment env = Environment.merge(env1, env2); + + assertEquals( + Arrays.asList("b", "d", "c", "a"), new ArrayList<>(env.getModules().keySet()) + ); + } + private static Map createCatalog(String name, String type) { Map prop = new HashMap<>();
[flink] branch master updated (387c8b6 -> 6a0570d)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 387c8b6 [hotfix][doc][hive] add Hive connector to SQL's connector page add 6a0570d [FLINK-15254][sql cli][module] modules in SQL CLI yaml should preserve order No new revisions were added by this update. Summary of changes: .../flink/table/client/config/Environment.java | 6 ++--- .../client/gateway/local/EnvironmentTest.java | 28 ++ 2 files changed, 31 insertions(+), 3 deletions(-)
[flink] branch master updated (bd15e36 -> 387c8b6)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from bd15e36 [FLINK-15263][hive][doc] add dedicated page for HiveCatalog add 387c8b6 [hotfix][doc][hive] add Hive connector to SQL's connector page No new revisions were added by this update. Summary of changes: docs/dev/table/connect.md | 11 +++ docs/dev/table/connect.zh.md | 11 +++ docs/dev/table/hive/index.md | 9 + docs/dev/table/hive/index.zh.md | 9 + docs/dev/table/hive/read_write_hive.md| 3 ++- docs/dev/table/hive/read_write_hive.zh.md | 3 ++- 6 files changed, 44 insertions(+), 2 deletions(-)
[flink] branch release-1.10 updated: [hotfix][doc][hive] add Hive connector to SQL's connector page
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new bac74ec [hotfix][doc][hive] add Hive connector to SQL's connector page bac74ec is described below commit bac74ecc87995dc4c73290d213ab8bc7af766d36 Author: bowen.li AuthorDate: Sun Dec 15 20:56:19 2019 -0800 [hotfix][doc][hive] add Hive connector to SQL's connector page --- docs/dev/table/connect.md | 11 +++ docs/dev/table/connect.zh.md | 11 +++ docs/dev/table/hive/index.md | 9 + docs/dev/table/hive/index.zh.md | 9 + docs/dev/table/hive/read_write_hive.md| 3 ++- docs/dev/table/hive/read_write_hive.zh.md | 3 ++- 6 files changed, 44 insertions(+), 2 deletions(-) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index a3a8dcd..4a06472 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -1258,6 +1258,17 @@ CREATE TABLE MyUserTable ( {% top %} + +### Hive Connector + +Source: Batch +Sink: Batch + +Please refer to [Hive integration]({{ site.baseurl }}/dev/table/hive/). + +{% top %} + + Table Formats - diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md index 67bac63..549222d 100644 --- a/docs/dev/table/connect.zh.md +++ b/docs/dev/table/connect.zh.md @@ -1258,6 +1258,17 @@ CREATE TABLE MyUserTable ( {% top %} + +### Hive Connector + +Source: Batch +Sink: Batch + +Please refer to [Hive integration]({{ site.baseurl }}/dev/table/hive/). + +{% top %} + + Table Formats - diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index 28cc118..4c8f093 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -295,6 +295,15 @@ catalogs: + +## DDL + +DDL to create Hive tables, views, partitions, functions within Flink will be supported soon. + +## DML + +Flink supports DML writing to Hive tables. Please refer to details in [Reading & Writing Hive Tables]({{ site.baseurl }}/dev/table/hive/read_write_hive.html) + ## Supported Types Currently `HiveCatalog` supports most Flink data types with the following mapping: diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 28cc118..4c8f093 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -295,6 +295,15 @@ catalogs: + +## DDL + +DDL to create Hive tables, views, partitions, functions within Flink will be supported soon. + +## DML + +Flink supports DML writing to Hive tables. Please refer to details in [Reading & Writing Hive Tables]({{ site.baseurl }}/dev/table/hive/read_write_hive.html) + ## Supported Types Currently `HiveCatalog` supports most Flink data types with the following mapping: diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/read_write_hive.md index 9814ceb..daa62a3 100644 --- a/docs/dev/table/hive/read_write_hive.md +++ b/docs/dev/table/hive/read_write_hive.md @@ -22,7 +22,8 @@ specific language governing permissions and limitations under the License. --> -Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. +Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. +Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. * This will be replaced by the TOC {:toc} diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/read_write_hive.zh.md index 9814ceb..daa62a3 100644 --- a/docs/dev/table/hive/read_write_hive.zh.md +++ b/docs/dev/table/hive/read_write_hive.zh.md @@ -22,7 +22,8 @@ specific language governing permissions and limitations under the License. --> -Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. +Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and write from Hive data as an alternative to Hive's batch engine. +Be sure to follow the instructions to include the correct [dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) in your application. * This will be replaced by the TOC {:toc}
[flink] branch release-1.10 updated: [FLINK-15263][hive][doc] add dedicated page for HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 1d41cae [FLINK-15263][hive][doc] add dedicated page for HiveCatalog 1d41cae is described below commit 1d41caee1a2cb4ade9039a66bf9bb273107393e1 Author: bowen.li AuthorDate: Sun Dec 15 00:44:22 2019 -0800 [FLINK-15263][hive][doc] add dedicated page for HiveCatalog this closes #10581. --- docs/dev/table/hive/hive_catalog.md| 248 + docs/dev/table/hive/hive_catalog.zh.md | 248 + docs/dev/table/hive/index.md | 25 +++- docs/dev/table/hive/index.zh.md| 25 +++- 4 files changed, 536 insertions(+), 10 deletions(-) diff --git a/docs/dev/table/hive/hive_catalog.md b/docs/dev/table/hive/hive_catalog.md new file mode 100644 index 000..81b3b1e --- /dev/null +++ b/docs/dev/table/hive/hive_catalog.md @@ -0,0 +1,248 @@ +--- +title: "HiveCatalog" +nav-parent_id: hive_tableapi +nav-pos: 1 +--- + + +Hive Metastore has evolved into the de facto metadata hub over the years in Hadoop ecosystem. Many companies have a single +Hive Metastore service instance in their production to manage all of their metadata, either Hive metadata or non-Hive metadata, + as the source of truth. + +For users who have both Hive and Flink deployments, `HiveCatalog` enables them to use Hive Metastore to manage Flink's metadata. + +For users who have just Flink deployment, `HiveCatalog` is the only persistent catalog provided out-of-box by Flink. +Without a persistent catalog, users using [Flink SQL DDL]({{ site.baseurl }}/dev/table/sql.html#specifying-a-ddl) have to repeatedly +create meta-objects like a Kafka table in each session, which wastes a lot of time. `HiveCatalog` fills this gap by empowering +users to create tables and other meta-objects only once, and reference and manage them with convenience later on across sessions. + + +## Set up HiveCatalog + +### Dependencies + +Setting up a `HiveCatalog` in Flink requires the same [dependencies]({{ site.baseurl }}/dev/table/hive/#dependencies) +as those of an overall Flink-Hive integration. + +### Configuration + +Setting up a `HiveCatalog` in Flink requires the same [configuration]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) +as those of an overall Flink-Hive integration. + + +## How to use HiveCatalog + +Once configured properly, `HiveCatalog` should just work out of box. Users can create Flink meta-objects with DDL, and shoud +see them immediately afterwards. + +### Example + +We will walk through a simple example here. + + step 1: set up a Hive Metastore + +Have a Hive Metastore running. + +Here, we set up a local Hive Metastore and our `hive-site.xml` file in local path `/opt/hive-conf/hive-site.xml`. +We have some configs like the following: + +{% highlight xml %} + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true + metadata is stored in a MySQL server + + + + javax.jdo.option.ConnectionDriverName + com.mysql.jdbc.Driver + MySQL JDBC driver class + + + + javax.jdo.option.ConnectionUserName + ... + user name for connecting to mysql server + + + + javax.jdo.option.ConnectionPassword + ... + password for connecting to mysql server + + + + hive.metastore.uris + thrift://localhost:9083 + IP address (or fully-qualified domain name) and port of the metastore host + + + + hive.metastore.schema.verification + true + + + +{% endhighlight %} + + +Test connection to the HMS with Hive Cli. Running some commands, we can see we have a database named `default` and there's no table in it. + + +{% highlight bash %} + +hive> show databases; +OK +default +Time taken: 0.032 seconds, Fetched: 1 row(s) + +hive> show tables; +OK +Time taken: 0.028 seconds, Fetched: 0 row(s) +{% endhighlight %} + + + step 2: configure Flink cluster and SQL CLI + +Add all Hive dependencies to `/lib` dir in Flink distribution, and modify SQL CLI's yaml config file `sql-cli-defaults.yaml` as following: + +{% highlight yaml %} + +execution: +planner: blink +type: streaming +... +current-catalog: myhive # set the HiveCatalog as the current catalog of the session +current-database: mydatabase + +catalogs: + - name: myhive + type: hive + hive-conf-dir: /opt/hive-conf # contains hive-site.xml + hive-version: 2.3.4 +{% endhighlight %} + + + step 3: set up a Kafka cluster + +Bootstrap a local Kafka 2.3.0 cluster with a topic named "test", and produce some simple data to the topic as tuple of name and age. + +{% highlight bash %} + +localhost$ bin/kafka-console-producer.sh --broke
[flink] branch master updated (7f8bd71 -> bd15e36)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7f8bd71 [FLINK-15128][hive][doc] Document support for Hive timestamp type add bd15e36 [FLINK-15263][hive][doc] add dedicated page for HiveCatalog No new revisions were added by this update. Summary of changes: docs/dev/table/hive/hive_catalog.md| 248 + docs/dev/table/hive/hive_catalog.zh.md | 248 + docs/dev/table/hive/index.md | 25 +++- docs/dev/table/hive/index.zh.md| 25 +++- 4 files changed, 536 insertions(+), 10 deletions(-) create mode 100644 docs/dev/table/hive/hive_catalog.md create mode 100644 docs/dev/table/hive/hive_catalog.zh.md
[flink] branch release-1.10 updated: [FLINK-15128][hive][doc] Document support for Hive timestamp type
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 3f65a3f [FLINK-15128][hive][doc] Document support for Hive timestamp type 3f65a3f is described below commit 3f65a3fe48970d338ca79a2f0ee7125f4376f617 Author: Rui Li AuthorDate: Thu Dec 12 17:46:49 2019 +0800 [FLINK-15128][hive][doc] Document support for Hive timestamp type this closes #10543. --- docs/dev/table/hive/index.md| 3 ++- docs/dev/table/hive/index.zh.md | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index 7425467..879e9e9 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -341,7 +341,7 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin DATE -TIMESTAMP +TIMESTAMP(9) TIMESTAMP @@ -372,5 +372,6 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin Note that: * Flink doesn't support Hive's `UNION` type is not supported +* Hive's `TIMESTAMP` always has precision 9 and doesn't support other precisions. As a result, `HiveCatalog` cannot store `TIMESTAMP` columns whose precisions are not 9. Hive UDFs, on the other hand, can process `TIMESTAMP` values with a precision <= 9. * Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET` * Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet diff --git a/docs/dev/table/hive/index.zh.md b/docs/dev/table/hive/index.zh.md index 7425467..879e9e9 100644 --- a/docs/dev/table/hive/index.zh.md +++ b/docs/dev/table/hive/index.zh.md @@ -341,7 +341,7 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin DATE -TIMESTAMP +TIMESTAMP(9) TIMESTAMP @@ -372,5 +372,6 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin Note that: * Flink doesn't support Hive's `UNION` type is not supported +* Hive's `TIMESTAMP` always has precision 9 and doesn't support other precisions. As a result, `HiveCatalog` cannot store `TIMESTAMP` columns whose precisions are not 9. Hive UDFs, on the other hand, can process `TIMESTAMP` values with a precision <= 9. * Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET` * Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
[flink] branch master updated (2ba2a91 -> 7f8bd71)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2ba2a91 [FLINK-15257][hive]convert HiveCatalogITCase.testCsvTableViaAPI() to use blink planner add 7f8bd71 [FLINK-15128][hive][doc] Document support for Hive timestamp type No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 3 ++- docs/dev/table/hive/index.zh.md | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15257][hive]convert HiveCatalogITCase.testCsvTableViaAPI() to use blink planner
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 154e934 [FLINK-15257][hive]convert HiveCatalogITCase.testCsvTableViaAPI() to use blink planner 154e934 is described below commit 154e934e6c4e7bd32017b63d5b8177b80b51d23f Author: zjuwangg AuthorDate: Mon Dec 16 15:04:23 2019 +0800 [FLINK-15257][hive]convert HiveCatalogITCase.testCsvTableViaAPI() to use blink planner this closes #10589. --- .../apache/flink/table/catalog/hive/HiveCatalogITCase.java | 13 - 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index 5ad1769..9e2eb0d 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -18,7 +18,8 @@ package org.apache.flink.table.catalog.hive; -import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; @@ -27,7 +28,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableUtils; import org.apache.flink.table.api.Types; -import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableBuilder; import org.apache.flink.table.catalog.ObjectPath; @@ -52,6 +52,7 @@ import java.io.FileReader; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -140,8 +141,9 @@ public class HiveCatalogITCase { @Test public void testCsvTableViaAPI() throws Exception { - ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); - BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); + EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + tableEnv.getConfig().addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 1)); tableEnv.registerCatalog("myhive", hiveCatalog); tableEnv.useCatalog("myhive"); @@ -190,7 +192,8 @@ public class HiveCatalogITCase { Table t = tableEnv.sqlQuery( String.format("select * from myhive.`default`.%s", sourceTableName)); - List result = tableEnv.toDataSet(t, Row.class).collect(); + List result = TableUtils.collectToList(t); + result.sort(Comparator.comparing(String::valueOf)); // assert query result assertEquals(
[flink] branch master updated (f6ce8dd -> 2ba2a91)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f6ce8dd [FLINK-15228][docs] Drop vendor specific deployment documentation add 2ba2a91 [FLINK-15257][hive]convert HiveCatalogITCase.testCsvTableViaAPI() to use blink planner No new revisions were added by this update. Summary of changes: .../apache/flink/table/catalog/hive/HiveCatalogITCase.java | 13 - 1 file changed, 8 insertions(+), 5 deletions(-)
[flink] branch master updated (8fa00ef -> 10936d7)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8fa00ef [hotfix][hive][test] enhance HiveCatalog IT case with 'alter table' and 'drop table' statements add 10936d7 [FLINK-15190][doc]add documentation for DDL in FLIP-69 No new revisions were added by this update. Summary of changes: docs/dev/table/sql.md| 75 +++- docs/dev/table/sql.zh.md | 81 +--- 2 files changed, 151 insertions(+), 5 deletions(-)
[flink] branch release-1.10 updated: [FLINK-15190][doc]add documentation for DDL in FLIP-69
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new b5b364c [FLINK-15190][doc]add documentation for DDL in FLIP-69 b5b364c is described below commit b5b364c941502920cb7d2f508e7f99840f856931 Author: zjuwangg AuthorDate: Fri Dec 13 16:35:04 2019 +0800 [FLINK-15190][doc]add documentation for DDL in FLIP-69 this closes #10565. --- docs/dev/table/sql.md| 75 +++- docs/dev/table/sql.zh.md | 81 +--- 2 files changed, 151 insertions(+), 5 deletions(-) diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 55115f2..6f65aa5 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1324,7 +1324,7 @@ MATCH_RECOGNIZE ( ## DDL -DDLs are specified with the `sqlUpdate()` method of the `TableEnvironment`. The method returns nothing for a success table creation. A `Table` can be register into the [Catalog](catalogs.html) with a `CREATE TABLE` statement, then can be referenced in SQL queries in method `sqlQuery()` of `TableEnvironment`. +DDLs are specified with the `sqlUpdate()` method of the `TableEnvironment`. The method returns nothing for a success create/drop/alter database or table operation. A `CatalogTable` can be register into the [Catalog](catalogs.html) with a `CREATE TABLE` statement, then can be referenced in SQL queries in method `sqlQuery()` of `TableEnvironment`. **Note:** Flink's DDL support is not yet feature complete. Queries that include unsupported SQL features cause a `TableException`. The supported features of SQL DDL on batch and streaming tables are listed in the following sections. @@ -1434,6 +1434,79 @@ If the table does not exist, nothing happens. {% top %} +### Alter Table + +* Rename Table + +{% highlight sql %} +ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name +{% endhighlight %} + +Rename the given table name to another new table name. + +* Set or Alter Table Properties + +{% highlight sql %} +ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...) +{% endhighlight %} + +Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one. + +{% top %} + +### Create Database + +{% highlight sql %} +CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name + [COMMENT database_comment] + WITH (key1=val1, key2=val2, ...) +{% endhighlight %} + +Create a database with the given database properties. If a database with the same name already exists in the catalog, an exception is thrown. + +**IF NOT EXISTS** + +If the database already exists, nothing happens. + +**WITH OPTIONS** + +Database properties used to store extra information related to this database. +The key and value of expression `key1=val1` should both be string literal. + +{% top %} + +### Drop Database + +{% highlight sql %} +DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ] +{% endhighlight %} + +Drop a database with the given database name. If the database to drop does not exist, an exception is thrown. + +**IF EXISTS** + +If the database does not exist, nothing happens. + +**RESTRICT** + +Dropping a non-empty database triggers an exception. Enabled by default. + +**CASCADE** + +Dropping a non-empty database also drops all associated tables and functions. + +{% top %} + +### Alter Database + +{% highlight sql %} +ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...) +{% endhighlight %} + +Set one or more properties in the specified database. If a particular property is already set in the database, override the old value with the new one. + +{% top %} + ## Data Types Please see the dedicated page about [data types](types.html). diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md index cab13fb..6f65aa5 100644 --- a/docs/dev/table/sql.zh.md +++ b/docs/dev/table/sql.zh.md @@ -29,7 +29,7 @@ This is a complete list of Data Definition Language (DDL) and Data Manipulation ## Query SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and tra [...] -In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [
[flink] branch release-1.10 updated: [hotfix][hive][test] enhance HiveCatalog IT case with 'alter table' and 'drop table' statements
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new f562fac [hotfix][hive][test] enhance HiveCatalog IT case with 'alter table' and 'drop table' statements f562fac is described below commit f562facedcd0fd5572ecea368bfd84db53c6c8f6 Author: bowen.li AuthorDate: Sun Dec 15 20:38:03 2019 -0800 [hotfix][hive][test] enhance HiveCatalog IT case with 'alter table' and 'drop table' statements --- .../flink/table/catalog/hive/HiveCatalogITCase.java | 21 + 1 file changed, 21 insertions(+) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index 91aa6eb..5ad1769 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -119,6 +119,23 @@ public class HiveCatalogITCase { Row.of("3", 3))), new HashSet<>(result) ); + + tableEnv.sqlUpdate("ALTER TABLE test2 RENAME TO newtable"); + + t = tableEnv.sqlQuery("SELECT * FROM myhive.`default`.newtable"); + + result = TableUtils.collectToList(t); + + // assert query result + assertEquals( + new HashSet<>(Arrays.asList( + Row.of("1", 1), + Row.of("2", 2), + Row.of("3", 3))), + new HashSet<>(result) + ); + + tableEnv.sqlUpdate("DROP TABLE newtable"); } @Test @@ -127,6 +144,7 @@ public class HiveCatalogITCase { BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); tableEnv.registerCatalog("myhive", hiveCatalog); + tableEnv.useCatalog("myhive"); TableSchema schema = TableSchema.builder() .field("name", DataTypes.STRING()) @@ -200,5 +218,8 @@ public class HiveCatalogITCase { // No more line assertNull(reader.readLine()); + + tableEnv.sqlUpdate(String.format("DROP TABLE %s", sourceTableName)); + tableEnv.sqlUpdate(String.format("DROP TABLE %s", sinkTableName)); } }
[flink] branch master updated (36e8244 -> 8fa00ef)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 36e8244 [hotfix][doc][hive] add doc on what hive dependencies to put in /lib dir add 8fa00ef [hotfix][hive][test] enhance HiveCatalog IT case with 'alter table' and 'drop table' statements No new revisions were added by this update. Summary of changes: .../flink/table/catalog/hive/HiveCatalogITCase.java | 21 + 1 file changed, 21 insertions(+)
[flink] branch release-1.10 updated: [hotfix][doc][hive] add doc on what hive dependencies to put in /lib dir
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 1cfb233 [hotfix][doc][hive] add doc on what hive dependencies to put in /lib dir 1cfb233 is described below commit 1cfb2339b0d1c72f8768c3503eff7814b65946d0 Author: bowen.li AuthorDate: Sat Dec 14 22:49:44 2019 -0800 [hotfix][doc][hive] add doc on what hive dependencies to put in /lib dir --- docs/dev/table/hive/index.md| 79 +++--- docs/dev/table/hive/index.zh.md | 85 ++--- 2 files changed, 153 insertions(+), 11 deletions(-) diff --git a/docs/dev/table/hive/index.md b/docs/dev/table/hive/index.md index ede376c..7425467 100644 --- a/docs/dev/table/hive/index.md +++ b/docs/dev/table/hive/index.md @@ -29,12 +29,17 @@ under the License. It serves as not only a SQL engine for big data analytics and ETL, but also a data management platform, where data is discovered, defined, and evolved. Flink offers a two-fold integration with Hive. -The first is to leverage Hive's Metastore as a persistent catalog for storing Flink specific metadata across sessions. + +The first is to leverage Hive's Metastore as a persistent catalog with Flink's `HiveCatalog` for storing Flink specific metadata across sessions. +For example, users can store their Kafka or ElasticSearch tables in Hive Metastore by using `HiveCatalog`, and reuse them later on in SQL queries. + The second is to offer Flink as an alternative engine for reading and writing Hive tables. -The hive catalog is designed to be “out of the box” compatible with existing Hive installations. +The `HiveCatalog` is designed to be “out of the box” compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables. +* Note that we highly recommend users using the [blink planner]({{ site.baseurl }}/dev/table/#dependency-structure) with Hive integration. + * This will be replaced by the TOC {:toc} @@ -75,7 +80,71 @@ Flink supports the following Hive versions. ### Dependencies -To integrate with Hive, users need some dependencies in their project. We are using Hive 2.3.4 and 1.2.1 as examples here. +To integrate with Hive, users need some dependencies in your `/lib/` directory in Flink distribution +to make the integration work in Table API program or SQL in SQL Client. + +We are using Hive 2.3.4 and 1.2.1 as examples here. + + + + +{% highlight txt %} + +/flink-{{ site.version }} + /lib + flink-dist{{ site.scala_version_suffix }}-{{ site.version }}.jar + flink-table{{ site.scala_version_suffix }}-{{ site.version }}.jar + // we highly recommend using Flink's blink planner with Hive integration + flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar + + // Flink's Hive connector + flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar + + // Hadoop dependencies + // Pick the correct Hadoop dependency for your project. + // Hive 2.3.4 is built with Hadoop 2.7.2. We pick 2.7.5 which flink-shaded-hadoop is pre-built with, + // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.7.2 + flink-hadoop-compatibility{{ site.scala_version_suffix }}-{{ site.version }}.jar + flink-shaded-hadoop-2-uber-2.7.5-{{ site.shaded_version }}.jar + + // Hive dependencies + hive-exec-2.3.4.jar + + ... +{% endhighlight %} + + + +{% highlight txt %} +/flink-{{ site.version }} + /lib + flink-dist{{ site.scala_version_suffix }}-{{ site.version }}.jar + flink-table{{ site.scala_version_suffix }}-{{ site.version }}.jar + // we highly recommend using Flink's blink planner with Hive integration + flink-table-blink{{ site.scala_version_suffix }}-{{ site.version }}.jar + + // Flink's Hive connector + flink-connector-hive{{ site.scala_version_suffix }}-{{ site.version }}.jar + + // Hadoop dependencies + // Pick the correct Hadoop dependency for your project. + // Hive 1.2.1 is built with Hadoop 2.6.0. We pick 2.6.5 which flink-shaded-hadoop is pre-built with, + // but users can pick their own hadoop version, as long as it's compatible with Hadoop 2.6.0 + flink-hadoop-compatibility{{ site.scala_version_suffix }}-{{ site.version }}.jar + flink-shaded-hadoop-2-uber-2.6.5-{{ site.shaded_version }}.jar + + // Hive dependencies + hive-metastore-1.2.1.jar + hive-exec-1.2.1.jar + libfb303-0.9.3.jar + + ... +{% endhighlight %} + + + + +Similarly, If you are building your own program, you need the above dependencies in your mvn file. @@ -107,7 +1
[flink] branch master updated (a5cfc3c -> 36e8244)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a5cfc3c [FLINK-15159][docs] Update the mapping of JSON schema string type to Flink SQL STRING type (#10582) add 36e8244 [hotfix][doc][hive] add doc on what hive dependencies to put in /lib dir No new revisions were added by this update. Summary of changes: docs/dev/table/hive/index.md| 79 +++--- docs/dev/table/hive/index.zh.md | 85 ++--- 2 files changed, 153 insertions(+), 11 deletions(-)