[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15816248#comment-15816248 ] ASF GitHub Bot commented on FLINK-5280: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3039 > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815979#comment-15815979 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong Thank you for your reviews and your help with this PR. I've updated the PR. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815203#comment-15815203 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Merging > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814642#comment-15814642 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95339229 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement --- End diff -- My mistake, `In case if` is fine here. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814559#comment-15814559 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95333198 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement --- End diff -- I am not sure about this. I've checked it with [Grammarly](grammarly.com) and it does not complain about "In case if", but complains about the "in case of". > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813852#comment-15813852 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95292990 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement --- End diff -- In case if -> In case of > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812080#comment-15812080 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95178608 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala --- @@ -53,7 +53,9 @@ class CsvTableSource( ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) - extends AbstractBatchStreamTableSource[Row] + extends BatchTableSource[Row] + with StreamTableSource[Row] + with DefinedFieldNames --- End diff -- If we define `returnType` as `new RowTypeInfo(fieldTypes, fieldNames)`, we do not need to implement `DefinedFieldNames`. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811150#comment-15811150 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong I've updated the PR according to your feedback. Could you please review it again? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804881#comment-15804881 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 @fhueske @wuchong Makes sense to me as well. I'll try to update the PR during the weekend. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804396#comment-15804396 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 Make sense to me > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804349#comment-15804349 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 @mushketyk no worries :-) @wuchong Since the methods are only added when needed by implementing the interface there is no default implementation. The logic of the default implementation (calling the static method of `TableEnvironment`) is directly put into the `TableSourceTable` and only replaced if the table source implements the new interface. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804341#comment-15804341 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 I like this idea. In this way, we only need to provide `BatchTableSource` and `StreamTableSource` interfaces, not involving the odd `BatchStreamTableSource`. We can keep the interface very clean. If I understand right, all the concrete implementation of `TableSource` will not implement `DefinesFieldNames` for now ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804333#comment-15804333 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 @fhueske Sorry, there are only two methods. Please ignore my comment :) I think you are right and this seems like a good approach. If @wuchong is on board with this I'll update the PR accordingly. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804300#comment-15804300 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Which `getTypeIndicies` methods are you referring to? `TableSource` does only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move the latter two to a separate interface, only `getReturnType` is left. Also I think this is typical OO design. We do not need reflection to check if an object implements an interface. That's a very common operation in Java and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is sufficient to check whether the table source implements the interface or not. Isn't this a good compromise of having a lean interface (also simple for Java users) and at the same time the possibility to override field names if necessary? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804281#comment-15804281 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think we will win a lot with this. Even if we remove these two methods from the `TableSource` trait interface there is still `getTypeIndices` method and Java users will have to call it if they are going to implement a `TableSource` trait. And if a user knows how to inherit a trait with one method he/she will be able to inherit a trait with three methods. The second problem with this approach is that it's not really Object-Oriented. We will have to rely on reflection tricks (probably sugared with pattern matching) while we simply having three methods is more clean OO solution. What if we leave all three methods and simply add some base Java implementations that already implement these traits? Something like `JavaBatchTableSource`, `JavaStreamTableSource`, and `JavaBatchStreamTableSource`? Then users will not need to struggle with the traits inheritance issues. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804149#comment-15804149 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 we remove the methods from `TableSource` and add them to an interface. If a table source does not implement the methods, we use the names provided by the `TypeInformation`. If the table source implements the methods, we use those names. The distinction is done in `TableSourceTable` What do you think? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804108#comment-15804108 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think I got the idea :) Could you elaborate on it? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804100#comment-15804100 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 I just had a crazy thought. :-) What do you think about moving `getFieldNames()` and `getFieldIndicies()` into a separate trait / interface maybe `DefinedFieldNames`? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803367#comment-15803367 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 I'm fine with removing `getFieldNames` and `getFieldIndices`. The way of Scala trait is too hard to implement a custom `TableSource` for Java users who do not know this trick. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802752#comment-15802752 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong I think we can solve the problem with current `TableSource` class hierarchy in a different way. If we remove all abstract classes and move all default implementations in `TableSource` trait it will make class hierarchy much simpler. The only drawback of this is that Java users will need to provide implementations of trait methods that explicitly [call default implementations](http://stackoverflow.com/a/7637888). I don't think this bad since it's a common way to extend Scala traits. We can additionally remove `getFieldNames` and `getFieldIndices` if you think they are superfluous. But I don't think there is a big difference. What do you think @fhueske, @wuchong ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802730#comment-15802730 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94865876 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [[BatchStreamTableSource]] trait. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +abstract class AbstractBatchStreamTableSource[T] +extends AbstractTableSource[T] +with BatchStreamTableSource[T] { --- End diff -- Good point. I'll update this. What do you think about the overall design of TableSource related classes? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802703#comment-15802703 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94859631 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* +* @param inputType The TypeInformation extract the field names. +* @tparam A The type of the TypeInformation. +* @return A an array holding the field names +*/ + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => +throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") +} + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +fieldNames + } + + /** +* Validate if class represented by the typeInfo is static and globally accessible +* @param typeInfo type to check +* @throws TableException if type does not meet these criteria +*/ + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field indexes for a given [[TypeInformation]]. +* +* Field indexes are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* The method fails if inputType is not a --- End diff -- No need to mention this, IMO. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802702#comment-15802702 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94857414 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, TableEnvironment} --- End diff -- import can be removed? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802705#comment-15802705 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94856759 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala --- @@ -120,6 +124,7 @@ trait FlinkRel { case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 +case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(0).getType()).asInstanceOf[Int] --- End diff -- shouldn't `get(0)` access the type at the index of the current field that is folded instead of always the first? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802699#comment-15802699 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94852514 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java --- @@ -92,4 +90,25 @@ public void testBatchTableSourceSQL() throws Exception { compareResultAsText(results, expected); } + @Test + public void testNestedBatchTableSourceSQL() throws Exception { --- End diff -- The Scala SQL ITCase should be sufficient to test this feature. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802706#comment-15802706 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94858739 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* The method fails if inputType is not a --- End diff -- I don't think we need to mention this. All Flink types are either composite or atomic (even though this is not strictly enforced). > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802700#comment-15802700 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94851899 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [[BatchStreamTableSource]] trait. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +abstract class AbstractBatchStreamTableSource[T] +extends AbstractTableSource[T] +with BatchStreamTableSource[T] { --- End diff -- can't we extend from `BatchTableSource` and `StreamTableSource` instead of having an additional `BatchStreamTableSource`? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802701#comment-15802701 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94851171 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,28 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to implement both + * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { --- End diff -- Please rename `getFieldsNames()` to `getFieldNames()` (the original `getFieldsNames() looks like a typo to me.`) and `getFieldsIndicies()` to `getFieldIndicies()`. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802704#comment-15802704 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94853135 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala --- @@ -60,4 +67,52 @@ object CommonTestData { ignoreComments = "%" ) } + + def getNestedTableSource: BatchTableSource[Person] = { +new AbstractBatchTableSource[Person] { + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = { +val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment +executionEnvironment.fromCollection( + util.Arrays.asList( +new Person("Mike", "Smith", new Address("5th Ave", "New-York")), +new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")), +new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))), + getReturnType +) + } + + /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ + override def getReturnType: TypeInformation[Person] = new PojoTypeInfo[Person]( --- End diff -- `TypeExtractor.getForClass(Person)` should return a `PojoTypeInfo[Person]`. No need to fiddle with Java reflection ;-) > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802707#comment-15802707 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94859657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* +* @param inputType The TypeInformation extract the field names. +* @tparam A The type of the TypeInformation. +* @return A an array holding the field names +*/ + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => +throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") +} + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +fieldNames + } + + /** +* Validate if class represented by the typeInfo is static and globally accessible +* @param typeInfo type to check +* @throws TableException if type does not meet these criteria +*/ + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field indexes for a given [[TypeInformation]]. +* +* Field indexes are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* +* @param inputType The TypeInformation extract the field positions from. +* @return A an array holding the field positions +*/ + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { +getFieldNames(inputType).indices.toArray + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]] +* or [[org.apache.flink.api.common.typeinfo.AtomicType]]. +* The method fails if inputType is not a --- End diff -- No need to mention this, IMO. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15796909#comment-15796909 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 What about extracting `getDataSet(ExecutionEnvironment)` and `getDataStream(StreamExecutionEnvironment)` to interfaces that called like `DataSetGetter` and `DataStreamGetter`. And we can make `BatchTableSource` extend `TableSource` abstract class and implement `DataSetGetter` interface. Make `StreamTableSource` extend `TableSource` abstract class and implement `DataStreamGetter` interface. And make `BatchStreamTableSource` implement both `DataSetGetter` and `DataStreamGetter`. So that we can use `TableSource` plus `DataSetGetter` where only `BatchTableSource` is expected. For example, the `BatchTableSourceScan` can be changed to like this: ```scala class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, val tableSource: TableSource[_], val datasetGetter: DataSetGetter) ``` Can this solve our problem ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15795541#comment-15795541 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @wuchong >> And provide three other abstract class : BatchTableSource (with getDataSet interface) , StreamTableSource (with getDataStream interface) and BatchStreamTableSource (with both interfaces), they all extend TableSource I thought about this, but in this case we won't be able to use an instance of `BatchStreamTableSource` where an instance of `BatchTableSource` or `StreamTableSource` is expected. Which seems to make `BatchStreamTableSource` abstract class useless. I believe that we should be able to use `BatchStreamTableSource` where either `TableSource`, `BatchTableSource` or `StreamTableSource` is expected and this requires to use multiple inheritance which is only possible with traits. But since we want to provide partial implementations of these types I've add several abstract class for users to inherit from. I see the issue with current approach, but I am not sure how to simplify it to achieve all required goals. Would better documentation do the trick? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15793958#comment-15793958 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 Hi @mushketyk , thanks for the updating. Regarding to the `TableSource` , currently, there are 8 `TableSource` interfaces provided to users including trait and abstract class. It is a little hard to choose which one to use to implement a custom table source. What about to implement `TableSource` as abstract class and provide default implementation for `getFieldNames()` and `getFieldsIndices()`. And provide three other abstract class : `BatchTableSource` (with `getDataSet` interface) , `StreamTableSource` (with `getDataStream` interface) and `BatchStreamTableSource` (with both interfaces), they all extend `TableSource`. In this way, we only provide 4 classes to user, the implementers should extend one of the latter three abstract class. What do you think ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15786387#comment-15786387 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong I've updated my PR according to your reviews. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15784253#comment-15784253 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94095173 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation[_]) --- End diff -- I think we can refactor this. Provide a `getFieldInfo` static method, get the field names from `getFieldInfo`, and then do additional check outside. What do you think ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15783939#comment-15783939 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94087339 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation[_]) --- End diff -- `getFieldInfo` is overridden in `StreamTableEnvironment`, so I cannot make it a static method. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780914#comment-15780914 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong Thank you for your reviews. I think creating three abstract classes is a good idea, since we don't expect any new types of table sources, so there will not be a lot of combinations. I'll try to update the PR tomorrow according to all current comments. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780667#comment-15780667 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Thanks for working on this @mushketyk and the reviews @wuchong. I just add a comment regarding the Scala trait with implemented function. I'll do a more thorough review in the next days. Thanks, Fabian > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780663#comment-15780663 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93943868 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- Hi, I'm not sure about implementing this as a Scala trait with implemented methods. IMO, this makes it much harder to implement TableSources in Java (esp. for users who are not familiar with Scala and its implications). What do you think about implementing `TableSource` as abstract class and providing three other abstract classes that extend `TableSource` with the batch, the stream, and both interfaces? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775059#comment-15775059 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818113 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => +throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") +} + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +fieldNames + } + + /** +* Validate if class represented by the typeInfo is static and globally accessible +* @param typeInfo type to check +* @throws TableException if type does not meet these criteria +*/ + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { --- End diff -- Would be better to add a comment above since it is a public method. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775051#comment-15775051 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816786 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -522,26 +523,29 @@ object TableEnvironment { * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ - def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { --- End diff -- The javadoc should be updated since the method changed > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775056#comment-15775056 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818167 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to implement both + * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the number of fields of the table. */ - def getNumberOfFields: Int - /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] - - /** Returns the types of the table fields. */ - def getFieldTypes: Array[TypeInformation[_]] + def getFieldsNames: Array[String] = { +TableEnvironment.getFieldNames(getReturnType) + } + + /** Returns the indices of the table fields. */ + def getFieldsIndexes: Array[Int] = { --- End diff -- The plural of index should be indices. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775057#comment-15775057 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818063 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala --- @@ -44,14 +44,14 @@ abstract class FlinkTable[T]( val fieldTypes: Array[TypeInformation[_]] = typeInfo match { - case cType: CompositeType[T] => + case cType: CompositeType[_] => --- End diff -- The fieldTypes can be get from `TableEnvironment.getFieldTypes(typeInfo)` and the length validation could be placed above. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775060#comment-15775060 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93817904 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- Would be better to comment why we do this. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775052#comment-15775052 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818018 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. --- End diff -- Please update the javadoc. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775047#comment-15775047 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816520 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation[_]) --- End diff -- Can we move this method into `TableEnvironment` object ? It's better to put the field info related methods in one place. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775048#comment-15775048 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -552,6 +556,10 @@ object TableEnvironment { } } + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { --- End diff -- The plural of index should be indices. And please add a comment to describe what this method do. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775055#comment-15775055 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818027 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => +throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") +} + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +fieldNames + } + + /** +* Validate if class represented by the typeInfo is static and globally accessible +* @param typeInfo type to check +* @throws TableException if type does not meet these criteria +*/ + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { +getFieldNames(inputType).indices.toArray + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation to extract field types from. +* @return an holding the field types. --- End diff -- An array holding the field types. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775049#comment-15775049 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818006 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. --- End diff -- The comment should be updated since AtomicType is also supported. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775054#comment-15775054 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816451 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -566,17 +574,13 @@ object TableEnvironment { def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { validateType(inputType) -inputType match { - case t: TupleTypeInfo[_] => getTypes(t) - case c: CaseClassTypeInfo[_] => getTypes(c) - case p: PojoTypeInfo[_] => getTypes(p) - case r: RowTypeInfo => getTypes(r) - case tpe => -throw new TableException(s"Type $tpe lacks explicit field naming") +getFieldNames(inputType).map { i => --- End diff -- I think this maybe error-prone. Field types array should be mapped by indices not field names. Such as PojoType filed names' order is not equal to field types' order. The original code in `UserDefinedFunctionUtil.getFieldInfo` maybe wrong too. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775058#comment-15775058 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818032 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. --- End diff -- The comment should be updated since AtomicType is also supported. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775050#comment-15775050 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93817908 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; + public int[] getFieldsIndexes() { --- End diff -- Would be better to comment why we do this. And `getFieldsIndexes` -> `getFieldsIndices`. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775053#comment-15775053 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816695 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -26,24 +26,24 @@ import org.apache.flink.table.api.TableEnvironment * Schema information consists of a data type, field names, and corresponding indices of * these names in the data type. * - * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case - * field names and field indices are deducted from the returned type. + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. * * In case if custom field names are required one need to implement both - * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]]. + * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { /** Returns the names of the table fields. */ def getFieldsNames: Array[String] = { -TableEnvironment.getFieldInfo(getReturnType)._1 +TableEnvironment.getFieldNames(getReturnType) } /** Returns the indices of the table fields. */ - def getFieldsIndices: Array[Int] = { -getFieldsNames.indices.toArray + def getFieldsIndexes: Array[Int] = { --- End diff -- The plural of index should be indices. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15774190#comment-15774190 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @wuchong I've updated the PR according to your comments. Could you please review it again? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773574#comment-15773574 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93797831 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) { * @return A tuple of two arrays holding the field names and corresponding field positions. */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): - (Array[String], Array[Int]) = - { -validateType(inputType) - -val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => -throw new TableException(s"Type $tpe lacks explicit field naming") -} -val fieldIndexes = fieldNames.indices.toArray - -if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") -} - -(fieldNames, fieldIndexes) + (Array[String], Array[Int]) = { +TableEnvironment.getFieldInfo(inputType) --- End diff -- It is overridden in a subclass, so I decided to leave this method here and only move the body out of it to make it reusable. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773176#comment-15773176 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93780163 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- Make sense to me. It seems that we have to keep them as traits. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772879#comment-15772879 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93764349 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- `CsvTableSource` is inheriting both `StreamTableSource` and `BatchTableSource` so they should be traits. I don't think that adding a method that only calling implementation from a trait is a big issue. In any case we do not duplicate the code and do not re-implement methods. Do you have any concerns about this? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772868#comment-15772868 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93763884 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- What about to make `TableSource` abstract class? So that it can fit in with Java and Scala without involving something hack. In this way, the `StreamTableSource` and `BatchTableSource` should be abstract class too. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772859#comment-15772859 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93763491 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case --- End diff -- I'm not sure about that. My IDEA highlight `[[TableSource.getReturnType]]` error but `[[TableSource#getReturnType]]` is fine. IDEA 2016.3.1, Scala plugin 2016.3.5 > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772837#comment-15772837 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @wuchong Thank you for review. I'll try to update the PR today. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772834#comment-15772834 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762178 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala --- @@ -44,14 +44,14 @@ abstract class FlinkTable[T]( val fieldTypes: Array[TypeInformation[_]] = --- End diff -- Ok, good point. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772836#comment-15772836 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762225 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable(val tableSource: TableSource[_]) extends FlinkTable[Row]( --- End diff -- Ok, good point. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772832#comment-15772832 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762127 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -38,7 +38,9 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] -flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) +flinkTypeFactory.buildRowDataType( + tableSource.getFieldsNames, + TableEnvironment.getFieldTypes(tableSource.getReturnType)) --- End diff -- Should be fine if we add support for `AtomicType` in `TableEnvironment.getFieldTypes` > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772830#comment-15772830 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762016 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [[TableSource]] */ --- End diff -- Good point. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772829#comment-15772829 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761987 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; + public int[] getFieldsIndices() { + return TableSource$class.getFieldsIndices(this); --- End diff -- Ditto. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772828#comment-15772828 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761979 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- We do not override this method. As far as I understand we cannot inherit a method from a Scala trait if this trait has implementation: http://stackoverflow.com/a/7637888 Am I missing something? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772825#comment-15772825 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the number of fields of the table. */ - def getNumberOfFields: Int - /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] - - /** Returns the types of the table fields. */ - def getFieldTypes: Array[TypeInformation[_]] + def getFieldsNames: Array[String] = { +TableEnvironment.getFieldInfo(getReturnType)._1 + } + + /** Returns the indices of the table fields. */ + def getFieldsIndices: Array[Int] = { +getFieldsNames.indices.toArray --- End diff -- Ok, good point. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772822#comment-15772822 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761791 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case + * field names and field indices are deducted from the returned type. --- End diff -- Good point. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772824#comment-15772824 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761823 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case --- End diff -- Aren't we using scaladoc here? I thought it's different in scaladoc. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772819#comment-15772819 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation to extract field types from. +* @return an holding the field types. +*/ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { +validateType(inputType) + +inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} + } + + private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = { +0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray --- End diff -- Good point. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772816#comment-15772816 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761556 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation to extract field types from. +* @return an holding the field types. +*/ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { +validateType(inputType) + +inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) --- End diff -- Ok, will update. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772813#comment-15772813 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { --- End diff -- Sure. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772815#comment-15772815 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. --- End diff -- Ok, this makes sense. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772811#comment-15772811 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames --- End diff -- Good point, will do that. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772756#comment-15772756 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758724 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [[TableSource]] */ --- End diff -- unused import `org.apache.flink.api.java.typeutils.RowTypeInfo` > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772754#comment-15772754 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756907 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case --- End diff -- `TableSource.getReturnType` -> `TableSource#getReturnType`, use `#` instead of `.` in javadoc. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772758#comment-15772758 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756487 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the number of fields of the table. */ - def getNumberOfFields: Int - /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] - - /** Returns the types of the table fields. */ - def getFieldTypes: Array[TypeInformation[_]] + def getFieldsNames: Array[String] = { +TableEnvironment.getFieldInfo(getReturnType)._1 + } + + /** Returns the indices of the table fields. */ + def getFieldsIndices: Array[Int] = { +getFieldsNames.indices.toArray --- End diff -- I think this could be simplified to `getReturnType.getArity.indices.toArray`. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772757#comment-15772757 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757413 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation to extract field types from. +* @return an holding the field types. +*/ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { +validateType(inputType) + +inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} + } + + private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = { +0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray --- End diff -- `c.getTypeAt(_)` can be simplified to `c.getTypeAt` . > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772763#comment-15772763 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757999 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) { * @return A tuple of two arrays holding the field names and corresponding field positions. */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): - (Array[String], Array[Int]) = - { -validateType(inputType) - -val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => -throw new TableException(s"Type $tpe lacks explicit field naming") -} -val fieldIndexes = fieldNames.indices.toArray - -if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") -} - -(fieldNames, fieldIndexes) + (Array[String], Array[Int]) = { +TableEnvironment.getFieldInfo(inputType) --- End diff -- It seems that this is redundant, we can remove this and use the util `getFieldInfo` instead of calling this method. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772759#comment-15772759 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation to extract field types from. +* @return an holding the field types. +*/ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { +validateType(inputType) + +inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => --- End diff -- Same here, should support `AtomicType`. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772765#comment-15772765 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93759808 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala --- @@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory abstract class FlinkTable[T]( -val typeInfo: TypeInformation[T], +val typeInfo: TypeInformation[_], --- End diff -- This can be reverted when you make `TableSourceTable` supporting generic type. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772752#comment-15772752 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93755631 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation to extract field types from. +* @return an holding the field types. +*/ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { +validateType(inputType) + +inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) --- End diff -- What about use `case c: CompositeType => c.getFieldNames` instead of case every type? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772760#comment-15772760 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757105 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; + public int[] getFieldsIndices() { + return TableSource$class.getFieldsIndices(this); --- End diff -- Why do we do like this ? It seems that we do not need to override this method ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772762#comment-15772762 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756642 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case + * field names and field indices are deducted from the returned type. --- End diff -- deducted -> derived ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772767#comment-15772767 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757791 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -38,7 +38,9 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] -flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) +flinkTypeFactory.buildRowDataType( + tableSource.getFieldsNames, + TableEnvironment.getFieldTypes(tableSource.getReturnType)) --- End diff -- What if the table source return an AtomicType ? It seems that it will fail here. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772770#comment-15772770 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93755496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames --- End diff -- what about use `case c: CompositeType => c.getFieldNames` instead of case every type? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772766#comment-15772766 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756954 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]]. --- End diff -- `TableSource.getFieldsNames` => `TableSource#getFieldsNames` `TableSource.getFieldsIndices` => `TableSource#getFieldsIndices` use `#` instead of `.` in javadoc. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772753#comment-15772753 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758640 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala --- @@ -44,14 +44,14 @@ abstract class FlinkTable[T]( val fieldTypes: Array[TypeInformation[_]] = --- End diff -- We can use the util `getFieldTypes` to generate the TypeInformation array to reduce redundant code. And we can put the length check above. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772755#comment-15772755 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756028 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { --- End diff -- Could you add comment to describe what does this method do ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772761#comment-15772761 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757097 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -111,24 +112,17 @@ return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); --- End diff -- Why do we do like this ? It seems that we do not need to override this method ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772768#comment-15772768 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93759469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => +throw new TableException(s"Type $tpe lacks explicit field naming") +} +val fieldIndexes = fieldNames.indices.toArray + +if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") +} + +(fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { +val clazz = typeInfo.getTypeClass +if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + +s"static and globally accessible.") +} + } + + /** +* Returns field types for a given [[TypeInformation]]. +* +* Field types are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. --- End diff -- I think we should not restrict the input type is a `CompositeType`, because the `TableSource.getReturnType` could return any type including `AtomicType`. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772764#comment-15772764 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758185 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** +* Returns field names and field positions for a given [[TypeInformation]]. +* +* Field names are automatically extracted for +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* The method fails if inputType is not a +* [[org.apache.flink.api.common.typeutils.CompositeType]]. +* +* @param inputType The TypeInformation extract the field names and positions from. +* @tparam A The type of the TypeInformation. +* @return A tuple of two arrays holding the field names and corresponding field positions. +*/ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { +validateType(inputType) + +val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => --- End diff -- We should extend here to support `AtomicType`. See `UserDefinedFunctionUtil.getFieldInfo(TypeInformation)`. And I would like to combine these methods. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772514#comment-15772514 ] ASF GitHub Bot commented on FLINK-5280: --- GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/3039 [FLINK-5280] Update TableSource to support nested data Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink nested-table-source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3039.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3039 commit 7bd26239dd9a7c41f09fbe070baa70b19278c51f Author: Ivan MushketykDate: 2016-12-22T21:26:34Z [FLINK-5280] Update TableSource to support nested data > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15771636#comment-15771636 ] Jark Wu commented on FLINK-5280: Sounds great, +1 go into 1.2 if we can catch up. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770522#comment-15770522 ] Fabian Hueske commented on FLINK-5280: -- Maybe we can squeeze this change still into the 1.2.0 release. Would be good since we are changing the interface of {{TableSource}}. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770509#comment-15770509 ] Fabian Hueske commented on FLINK-5280: -- Great, I just merged FLINK-5348 :-) > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770498#comment-15770498 ] Ivan Mushketyk commented on FLINK-5280: --- Hi guys, I like your idea, Fabian, I'll start working on it and see how it goes. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770065#comment-15770065 ] Fabian Hueske commented on FLINK-5280: -- It seems that these util methods have been implemented a couple of times. As I said, there is also {{TableEnvironment.getFieldInto()}} for names and indicies and {{FlinkTable}} for the types. Would be good to have all of that in one place. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770018#comment-15770018 ] Jark Wu commented on FLINK-5280: Hi guys, It is a very good extension of the discussion. It seems that I'm late for the discussion :). I will still post my ideas below. I think {{getFieldTypes()}} and {{getNumberOfFields()}} can be derived from {{getReturnType()}} all the time. So I would like to move them into util class, it will make the interface clean. Actually, we already have one called {{UserDefinedFunctionUtil#getFieldInfo(TypeInformation)}} and {{TableEnvironment.getFieldInfo(TypeInformation)}} which returns field types and names and indicies. We can refactor them and move them to a better place and maybe split variant (i.e. {{getFieldNames}}, {{getFieldIndicies}}, {{getFieldTypes}}). And provide the default implementation of {{getFieldNames}} and {{getFieldIndicies}} based on the util. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15769949#comment-15769949 ] Fabian Hueske commented on FLINK-5280: -- How about the following: * We make the {{TableEnvironment.getFieldInfo()}} methods statically accessible, i.e., we move them in the TableEnvironment companion object and call this method for the default implementation of {{TableSource.getFieldNames()}} and {{TableSource.getFieldIndicies()}}. This way we avoid code duplication. * We remove {{getFieldTypes()}} and {{getNumberOfFields()}}. These methods are in fact not necessary for the interface. Have a look at how a regular DataSet is converted into a Table. The field types are extracted in {{FlinkTable}}. {{TableSourceTable}} overrides this logic. We could simply keep the {{FlinkTable}} logic. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)