[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15816248#comment-15816248
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3039


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815979#comment-15815979
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @fhueske, @wuchong 

Thank you for your reviews and your help with this PR.
I've updated the PR.



> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815203#comment-15815203
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
Merging


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814642#comment-15814642
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r95339229
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,22 +19,23 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource#getReturnType]]. In this case
+  * field names and field indices are derived from the returned type.
+  *
+  * In case if custom field names are required one need to additionally 
implement
--- End diff --

My mistake, `In case if` is fine here.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814559#comment-15814559
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r95333198
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,22 +19,23 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource#getReturnType]]. In this case
+  * field names and field indices are derived from the returned type.
+  *
+  * In case if custom field names are required one need to additionally 
implement
--- End diff --

I am not sure about this. I've checked it with [Grammarly](grammarly.com) 
and it does not complain about "In case if", but complains about the "in case 
of".


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813852#comment-15813852
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r95292990
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,22 +19,23 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource#getReturnType]]. In this case
+  * field names and field indices are derived from the returned type.
+  *
+  * In case if custom field names are required one need to additionally 
implement
--- End diff --

In case if  -> In case of


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812080#comment-15812080
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r95178608
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 ---
@@ -53,7 +53,9 @@ class CsvTableSource(
 ignoreFirstLine: Boolean = false,
 ignoreComments: String = null,
 lenient: Boolean = false)
-  extends AbstractBatchStreamTableSource[Row]
+  extends BatchTableSource[Row]
+  with StreamTableSource[Row]
+  with DefinedFieldNames
--- End diff --

If we define `returnType` as `new RowTypeInfo(fieldTypes, fieldNames)`, we 
do not need to implement `DefinedFieldNames`.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811150#comment-15811150
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @fhueske, @wuchong 

I've updated the PR according to your feedback.
Could you please review it again?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804881#comment-15804881
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
@fhueske @wuchong Makes sense to me as well.

I'll try to update the PR during the weekend.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804396#comment-15804396
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
Make sense to me  


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804349#comment-15804349
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
@mushketyk  no worries :-)

@wuchong Since the methods are only added when needed by implementing the 
interface there is no default implementation. The logic of the default 
implementation (calling the static method of `TableEnvironment`) is directly 
put into the `TableSourceTable` and only replaced if the table source 
implements the new interface.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804341#comment-15804341
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
I like this idea. In this way, we only need to provide `BatchTableSource` 
and `StreamTableSource` interfaces, not involving the odd 
`BatchStreamTableSource`.  We can keep the interface very clean.

If I understand right, all the concrete implementation of `TableSource` 
will not implement `DefinesFieldNames` for now ?  


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804333#comment-15804333
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
@fhueske 

Sorry, there are only two methods. Please ignore my comment :)

I think you are right and this seems like a good approach. If @wuchong is 
on board with this I'll update the PR accordingly.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804300#comment-15804300
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
Which `getTypeIndicies` methods are you referring to? `TableSource` does 
only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move 
the latter two to a separate interface, only `getReturnType` is left.

Also I think this is typical OO design. We do not need reflection to check 
if an object implements an interface. That's a very common operation in Java 
and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is 
sufficient to check whether the table source implements the interface or not.

Isn't this a good compromise of having a lean interface (also simple for 
Java users) and at the same time the possibility to override field names if 
necessary?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804281#comment-15804281
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
I don't think we will win a lot with this. Even if we remove these two 
methods from the `TableSource` trait interface there is still `getTypeIndices` 
method and Java users will have to call it if they are going to implement a 
`TableSource` trait. And if a user knows how to inherit a trait with one method 
he/she will be able to inherit a trait with three methods. 

The second problem with this approach is that it's not really 
Object-Oriented. We will have to rely on reflection tricks (probably sugared 
with pattern matching) while we simply having three methods is more clean OO 
solution.

What if we leave all three methods and simply add some base Java 
implementations that already implement these traits? Something like 
`JavaBatchTableSource`, `JavaStreamTableSource`, and 
`JavaBatchStreamTableSource`? Then users will not need to struggle with the 
traits inheritance issues.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804149#comment-15804149
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
we remove the methods from `TableSource` and add them to an interface. If a 
table source does not implement the methods, we use the names provided by the 
`TypeInformation`. If the table source implements the methods, we use those 
names. The distinction is done in `TableSourceTable`

What do you think?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804108#comment-15804108
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
I don't think I got the idea :) Could you elaborate on it?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804100#comment-15804100
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
I just had a crazy thought. :-) 
What do you think about moving `getFieldNames()` and `getFieldIndicies()` 
into a separate trait / interface maybe `DefinedFieldNames`?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803367#comment-15803367
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
I'm fine with removing `getFieldNames` and `getFieldIndices`. The way of 
Scala trait is too hard to implement a custom `TableSource` for Java users who 
do not know this trick. 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802752#comment-15802752
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @fhueske, @wuchong 

I think we can solve the problem with current `TableSource` class hierarchy 
in a different way. If we remove all abstract classes and move all default 
implementations in `TableSource` trait it will make class hierarchy much 
simpler. The only drawback of this is that Java users will need to provide 
implementations of trait methods that explicitly [call default 
implementations](http://stackoverflow.com/a/7637888).

I don't think this bad since it's a common way to extend Scala traits. We 
can additionally remove `getFieldNames` and `getFieldIndices` if you think they 
are superfluous.  But I don't think there is a big difference.

What do you think @fhueske, @wuchong ?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802730#comment-15802730
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94865876
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+  * Partial implementation of the [[BatchStreamTableSource]] trait.
+  *
+  * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by 
this [[TableSource]].
+  */
+abstract class AbstractBatchStreamTableSource[T]
+extends AbstractTableSource[T]
+with BatchStreamTableSource[T] {
--- End diff --

Good point. I'll update this.

What do you think about the overall design of TableSource related classes?



> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802703#comment-15802703
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94859631
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +511,92 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names for a given [[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+*
+* @param inputType The TypeInformation extract the field names.
+* @tparam A The type of the TypeInformation.
+* @return A an array holding the field names
+*/
+  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: CompositeType[_] => t.getFieldNames
+  case a: AtomicType[_] => Array("f0")
+  case tpe =>
+throw new TableException(s"Currently only CompositeType and 
AtomicType are supported. " +
+  s"Type $tpe lacks explicit field naming")
+}
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+fieldNames
+  }
+
+  /**
+* Validate if class represented by the typeInfo is static and globally 
accessible
+* @param typeInfo type to check
+* @throws TableException if type does not meet these criteria
+*/
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field indexes for a given [[TypeInformation]].
+*
+* Field indexes are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+* The method fails if inputType is not a
--- End diff --

No need to mention this, IMO.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802702#comment-15802702
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94857414
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, 
SqlTimeTypeInfo, TypeIn
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, 
RowTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableEnvironment}
--- End diff --

import can be removed?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802705#comment-15802705
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94856759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
 ---
@@ -120,6 +124,7 @@ trait FlinkRel {
 case typeName if 
SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) 
=> s + 4
 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE 
=> s + 12
+case SqlTypeName.ROW => s + 
estimateRowSize(fieldList.get(0).getType()).asInstanceOf[Int]
--- End diff --

shouldn't `get(0)` access the type at the index of the current field that 
is folded instead of always the first?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802699#comment-15802699
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94852514
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
 ---
@@ -92,4 +90,25 @@ public void testBatchTableSourceSQL() throws Exception {
compareResultAsText(results, expected);
}
 
+   @Test
+   public void testNestedBatchTableSourceSQL() throws Exception {
--- End diff --

The Scala SQL ITCase should be sufficient to test this feature.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802706#comment-15802706
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94858739
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +511,92 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names for a given [[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+* The method fails if inputType is not a
--- End diff --

I don't think we need to mention this. All Flink types are either composite 
or atomic (even though this is not strictly enforced).


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802700#comment-15802700
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94851899
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+  * Partial implementation of the [[BatchStreamTableSource]] trait.
+  *
+  * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by 
this [[TableSource]].
+  */
+abstract class AbstractBatchStreamTableSource[T]
+extends AbstractTableSource[T]
+with BatchStreamTableSource[T] {
--- End diff --

can't we extend from `BatchTableSource` and `StreamTableSource` instead of 
having an additional `BatchStreamTableSource`?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802701#comment-15802701
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94851171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,28 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource#getReturnType]]. In this case
+  * field names and field indices are derived from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
--- End diff --

Please rename `getFieldsNames()` to  `getFieldNames()` (the original 
`getFieldsNames() looks like a typo to me.`) and `getFieldsIndicies()` to 
`getFieldIndicies()`.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802704#comment-15802704
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94853135
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
 ---
@@ -60,4 +67,52 @@ object CommonTestData {
   ignoreComments = "%"
 )
   }
+
+  def getNestedTableSource: BatchTableSource[Person] = {
+new AbstractBatchTableSource[Person] {
+  override def getDataSet(execEnv: ExecutionEnvironment): 
DataSet[Person] = {
+val executionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+executionEnvironment.fromCollection(
+  util.Arrays.asList(
+new Person("Mike", "Smith", new Address("5th Ave", 
"New-York")),
+new Person("Sally", "Miller", new Address("Potsdamer Platz", 
"Berlin")),
+new Person("Bob", "Taylor", new Address("Pearse Street", 
"Dublin"))),
+  getReturnType
+)
+  }
+
+  /** Returns the [[TypeInformation]] for the return type of the 
[[TableSource]]. */
+  override def getReturnType: TypeInformation[Person] = new 
PojoTypeInfo[Person](
--- End diff --

`TypeExtractor.getForClass(Person)` should return a `PojoTypeInfo[Person]`. 
No need to fiddle with Java reflection ;-)


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15802707#comment-15802707
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94859657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +511,92 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names for a given [[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+*
+* @param inputType The TypeInformation extract the field names.
+* @tparam A The type of the TypeInformation.
+* @return A an array holding the field names
+*/
+  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: CompositeType[_] => t.getFieldNames
+  case a: AtomicType[_] => Array("f0")
+  case tpe =>
+throw new TableException(s"Currently only CompositeType and 
AtomicType are supported. " +
+  s"Type $tpe lacks explicit field naming")
+}
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+fieldNames
+  }
+
+  /**
+* Validate if class represented by the typeInfo is static and globally 
accessible
+* @param typeInfo type to check
+* @throws TableException if type does not meet these criteria
+*/
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field indexes for a given [[TypeInformation]].
+*
+* Field indexes are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+*
+* @param inputType The TypeInformation extract the field positions 
from.
+* @return A an array holding the field positions
+*/
+  def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
+getFieldNames(inputType).indices.toArray
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]]
+* or [[org.apache.flink.api.common.typeinfo.AtomicType]].
+* The method fails if inputType is not a
--- End diff --

No need to mention this, IMO.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15796909#comment-15796909
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
What about extracting `getDataSet(ExecutionEnvironment)` and 
`getDataStream(StreamExecutionEnvironment)` to interfaces that called like 
`DataSetGetter` and `DataStreamGetter`. 

And we can make `BatchTableSource` extend `TableSource` abstract class and 
implement  `DataSetGetter` interface. Make `StreamTableSource` extend 
`TableSource` abstract class and implement  `DataStreamGetter` interface. And  
make `BatchStreamTableSource` implement both `DataSetGetter` and 
`DataStreamGetter`.  So that we can use `TableSource` plus `DataSetGetter` 
where only `BatchTableSource` is expected. For example, the 
`BatchTableSourceScan` can be changed to like this: 

```scala
class BatchTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
val tableSource: TableSource[_],
val datasetGetter: DataSetGetter)
```

Can this solve our problem ? 




> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15795541#comment-15795541
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

>> And provide three other abstract class : BatchTableSource (with 
getDataSet interface) , StreamTableSource (with getDataStream interface) and 
BatchStreamTableSource (with both interfaces), they all extend TableSource

I thought about this, but in this case we won't be able to use an instance 
of `BatchStreamTableSource` where an instance of  `BatchTableSource` or  
`StreamTableSource` is expected. Which seems to make `BatchStreamTableSource` 
abstract class useless.

I believe that we should be able to use `BatchStreamTableSource` where 
either `TableSource`, `BatchTableSource` or `StreamTableSource` is expected and 
this requires to use multiple inheritance which is only possible with traits. 
But since we want to provide partial implementations of these types I've add 
several abstract class for users to inherit from.

I see the issue with current approach, but I am not sure how to simplify it 
to achieve all required goals. Would better documentation do the trick?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15793958#comment-15793958
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @mushketyk , thanks for the updating. 

Regarding to the `TableSource` , currently, there are 8 `TableSource` 
interfaces provided to users including trait and abstract class. It is a little 
hard to choose which one to use to implement a custom table source. What about 
to implement `TableSource` as abstract class and provide default implementation 
for `getFieldNames()` and `getFieldsIndices()`. And provide three other 
abstract class : `BatchTableSource` (with `getDataSet` interface) , 
`StreamTableSource` (with `getDataStream` interface) and 
`BatchStreamTableSource` (with both interfaces), they all extend `TableSource`. 
In this way, we only provide 4 classes to user, the implementers should extend 
one of the latter three abstract class. What do you think ? 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15786387#comment-15786387
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @fhueske, @wuchong 

I've updated my PR according to your reviews.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15784253#comment-15784253
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94095173
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
   def getFieldInfo(inputType: TypeInformation[_])
--- End diff --

I think we can refactor this. Provide a `getFieldInfo` static method,  get 
the field names from `getFieldInfo`, and then do additional check outside. What 
do you think ? 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15783939#comment-15783939
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r94087339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
   def getFieldInfo(inputType: TypeInformation[_])
--- End diff --

`getFieldInfo` is overridden in `StreamTableEnvironment`, so I cannot make 
it a static method.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780914#comment-15780914
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @fhueske, @wuchong 

Thank you for your reviews.

I think creating three abstract classes is a good idea, since we don't 
expect any new types of table sources, so there will not be a lot of 
combinations.

I'll try to update the PR tomorrow according to all current comments.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780667#comment-15780667
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
Thanks for working on this @mushketyk and the reviews @wuchong.
I just add a comment regarding the Scala trait with implemented function. 
I'll do a more thorough review in the next days.

Thanks, Fabian


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15780663#comment-15780663
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93943868
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

Hi,

I'm not sure about implementing this as a Scala trait with implemented 
methods. IMO, this makes it much harder to implement TableSources in Java (esp. 
for users who are not familiar with Scala and its implications). 

What do you think about implementing `TableSource` as abstract class and 
providing three other abstract classes that extend `TableSource` with the 
batch, the stream, and both interfaces?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775059#comment-15775059
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818113
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +510,77 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: CompositeType[_] => t.getFieldNames
+  case a: AtomicType[_] => Array("f0")
+  case tpe =>
+throw new TableException(s"Currently only CompositeType and 
AtomicType are supported. " +
+  s"Type $tpe lacks explicit field naming")
+}
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+fieldNames
+  }
+
+  /**
+* Validate if class represented by the typeInfo is static and globally 
accessible
+* @param typeInfo type to check
+* @throws TableException if type does not meet these criteria
+*/
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
--- End diff --

Would be better to add a comment above since it is a public method.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775051#comment-15775051
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93816786
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -522,26 +523,29 @@ object TableEnvironment {
 * @tparam A The type of the TypeInformation.
 * @return A tuple of two arrays holding the field names and 
corresponding field positions.
 */
-  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
--- End diff --

The javadoc should be updated since the method changed


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775056#comment-15775056
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource#getReturnType]]. In this case
+  * field names and field indices are derived from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
   /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
+  def getFieldsNames: Array[String] = {
+TableEnvironment.getFieldNames(getReturnType)
+  }
+
+  /** Returns the indices of the table fields. */
+  def getFieldsIndexes: Array[Int] = {
--- End diff --

The plural of index should be indices.



> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775057#comment-15775057
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 ---
@@ -44,14 +44,14 @@ abstract class FlinkTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
 typeInfo match {
-  case cType: CompositeType[T] =>
+  case cType: CompositeType[_] =>
--- End diff --

The fieldTypes can be get from `TableEnvironment.getFieldTypes(typeInfo)` 
and the length validation could be placed above.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775060#comment-15775060
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93817904
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

Would be better to comment why we do this. 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775052#comment-15775052
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818018
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +510,77 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
--- End diff --

Please update the javadoc.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775047#comment-15775047
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93816520
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
   def getFieldInfo(inputType: TypeInformation[_])
--- End diff --

Can we move this method into `TableEnvironment` object ? It's better to put 
the field info related methods in one place.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775048#comment-15775048
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93816761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -552,6 +556,10 @@ object TableEnvironment {
 }
   }
 
+  def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
--- End diff --

The plural of index should be indices. And please add a comment to describe 
what this method do. 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775055#comment-15775055
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818027
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +510,77 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: CompositeType[_] => t.getFieldNames
+  case a: AtomicType[_] => Array("f0")
+  case tpe =>
+throw new TableException(s"Currently only CompositeType and 
AtomicType are supported. " +
+  s"Type $tpe lacks explicit field naming")
+}
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+fieldNames
+  }
+
+  /**
+* Validate if class represented by the typeInfo is static and globally 
accessible
+* @param typeInfo type to check
+* @throws TableException if type does not meet these criteria
+*/
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
+getFieldNames(inputType).indices.toArray
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
--- End diff --

An array holding the field types.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775049#comment-15775049
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818006
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +510,77 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
--- End diff --

The comment should be updated since AtomicType is also supported.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775054#comment-15775054
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93816451
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -566,17 +574,13 @@ object TableEnvironment {
   def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
 validateType(inputType)
 
-inputType match {
-  case t: TupleTypeInfo[_] => getTypes(t)
-  case c: CaseClassTypeInfo[_] => getTypes(c)
-  case p: PojoTypeInfo[_] => getTypes(p)
-  case r: RowTypeInfo => getTypes(r)
-  case tpe =>
-throw new TableException(s"Type $tpe lacks explicit field naming")
+getFieldNames(inputType).map { i =>
--- End diff --

I think this maybe error-prone. Field types array should be mapped by 
indices not field names. Such as PojoType filed names' order is not equal to 
field types' order. The original code in `UserDefinedFunctionUtil.getFieldInfo` 
maybe wrong too.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775058#comment-15775058
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93818032
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
--- End diff --

The comment should be updated since AtomicType is also supported.



> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775050#comment-15775050
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93817908
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
}
 
-   @Override
-   public TypeInformation[] getFieldTypes() {
-   return fieldTypes;
+   public int[] getFieldsIndexes() {
--- End diff --

Would be better to comment why we do this. And `getFieldsIndexes` -> 
`getFieldsIndices`.



> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15775053#comment-15775053
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93816695
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -26,24 +26,24 @@ import org.apache.flink.table.api.TableEnvironment
   * Schema information consists of a data type, field names, and 
corresponding indices of
   * these names in the data type.
   *
-  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
-  * field names and field indices are deducted from the returned type.
+  * To define a TableSource one need to implement 
[[TableSource#getReturnType]]. In this case
+  * field names and field indices are derived from the returned type.
   *
   * In case if custom field names are required one need to implement both
-  * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
+  * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
   /** Returns the names of the table fields. */
   def getFieldsNames: Array[String] = {
-TableEnvironment.getFieldInfo(getReturnType)._1
+TableEnvironment.getFieldNames(getReturnType)
   }
 
   /** Returns the indices of the table fields. */
-  def getFieldsIndices: Array[Int] = {
-getFieldsNames.indices.toArray
+  def getFieldsIndexes: Array[Int] = {
--- End diff --

The plural of index should be indices.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15774190#comment-15774190
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

I've updated the PR according to your comments. Could you please review it 
again?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773574#comment-15773574
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93797831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @return A tuple of two arrays holding the field names and 
corresponding field positions.
 */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-  (Array[String], Array[Int]) =
-  {
-validateType(inputType)
-
-val fieldNames: Array[String] = inputType match {
-  case t: TupleTypeInfo[A] => t.getFieldNames
-  case c: CaseClassTypeInfo[A] => c.getFieldNames
-  case p: PojoTypeInfo[A] => p.getFieldNames
-  case r: RowTypeInfo => r.getFieldNames
-  case tpe =>
-throw new TableException(s"Type $tpe lacks explicit field naming")
-}
-val fieldIndexes = fieldNames.indices.toArray
-
-if (fieldNames.contains("*")) {
-  throw new TableException("Field name can not be '*'.")
-}
-
-(fieldNames, fieldIndexes)
+  (Array[String], Array[Int]) = {
+TableEnvironment.getFieldInfo(inputType)
--- End diff --

It is overridden in a subclass, so I decided to leave this method here and 
only move the body out of it to make it reusable.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15773176#comment-15773176
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93780163
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

Make sense to me. It seems that we have to keep them as traits.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772879#comment-15772879
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93764349
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

`CsvTableSource` is inheriting both `StreamTableSource` and 
`BatchTableSource` so they should be traits.

I don't think that adding a method that only calling implementation from a 
trait is a big issue. In any case we do not duplicate the code and do not 
re-implement methods.

Do you have any concerns about this?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772868#comment-15772868
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93763884
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

What about to make `TableSource` abstract class? So that it can fit in with 
Java and Scala without involving something hack.

In this way, the `StreamTableSource` and `BatchTableSource` should be 
abstract class too.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772859#comment-15772859
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93763491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

I'm not sure about that. My IDEA highlight `[[TableSource.getReturnType]]` 
error but `[[TableSource#getReturnType]]` is fine. 

IDEA 2016.3.1, Scala plugin 2016.3.5


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772837#comment-15772837
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

Thank you for review. I'll try to update the PR today.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772834#comment-15772834
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762178
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 ---
@@ -44,14 +44,14 @@ abstract class FlinkTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
--- End diff --

Ok, good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772836#comment-15772836
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable(val tableSource: TableSource[_])
   extends FlinkTable[Row](
--- End diff --

Ok, good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772832#comment-15772832
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -38,7 +38,9 @@ class BatchTableSourceScan(
 
   override def deriveRowType() = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, 
tableSource.getFieldTypes)
+flinkTypeFactory.buildRowDataType(
+  tableSource.getFieldsNames,
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
--- End diff --

Should be fine if we add support for `AtomicType` in 
`TableEnvironment.getFieldTypes`


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772830#comment-15772830
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762016
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
--- End diff --

Good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772829#comment-15772829
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761987
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
}
 
-   @Override
-   public TypeInformation[] getFieldTypes() {
-   return fieldTypes;
+   public int[] getFieldsIndices() {
+   return TableSource$class.getFieldsIndices(this);
--- End diff --

Ditto.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772828#comment-15772828
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761979
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

We do not override this method. As far as I understand we cannot inherit a 
method from a Scala trait if this trait has implementation: 
http://stackoverflow.com/a/7637888

Am I missing something?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772825#comment-15772825
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
   /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
+  def getFieldsNames: Array[String] = {
+TableEnvironment.getFieldInfo(getReturnType)._1
+  }
+
+  /** Returns the indices of the table fields. */
+  def getFieldsIndices: Array[Int] = {
+getFieldsNames.indices.toArray
--- End diff --

Ok, good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772822#comment-15772822
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761791
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
--- End diff --

Good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772824#comment-15772824
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761823
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

Aren't we using scaladoc here? I thought it's different in scaladoc.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772819#comment-15772819
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+  }
+
+  private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = {
+0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
--- End diff --

Good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772816#comment-15772816
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
--- End diff --

Ok, will update.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772813#comment-15772813
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
--- End diff --

Sure.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772815#comment-15772815
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
--- End diff --

Ok, this makes sense.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772811#comment-15772811
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
--- End diff --

Good point, will do that.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772756#comment-15772756
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93758724
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
--- End diff --

unused import `org.apache.flink.api.java.typeutils.RowTypeInfo`


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772754#comment-15772754
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93756907
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

`TableSource.getReturnType`  -> `TableSource#getReturnType`,  use `#` 
instead of `.` in javadoc.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772758#comment-15772758
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93756487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
   /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
+  def getFieldsNames: Array[String] = {
+TableEnvironment.getFieldInfo(getReturnType)._1
+  }
+
+  /** Returns the indices of the table fields. */
+  def getFieldsIndices: Array[Int] = {
+getFieldsNames.indices.toArray
--- End diff --

I think this could be simplified to 
`getReturnType.getArity.indices.toArray`.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772757#comment-15772757
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93757413
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+  }
+
+  private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = {
+0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
--- End diff --

`c.getTypeAt(_)` can be simplified to `c.getTypeAt` .


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772763#comment-15772763
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93757999
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @return A tuple of two arrays holding the field names and 
corresponding field positions.
 */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-  (Array[String], Array[Int]) =
-  {
-validateType(inputType)
-
-val fieldNames: Array[String] = inputType match {
-  case t: TupleTypeInfo[A] => t.getFieldNames
-  case c: CaseClassTypeInfo[A] => c.getFieldNames
-  case p: PojoTypeInfo[A] => p.getFieldNames
-  case r: RowTypeInfo => r.getFieldNames
-  case tpe =>
-throw new TableException(s"Type $tpe lacks explicit field naming")
-}
-val fieldIndexes = fieldNames.indices.toArray
-
-if (fieldNames.contains("*")) {
-  throw new TableException("Field name can not be '*'.")
-}
-
-(fieldNames, fieldIndexes)
+  (Array[String], Array[Int]) = {
+TableEnvironment.getFieldInfo(inputType)
--- End diff --

It seems that this is redundant, we can remove this and use the util 
`getFieldInfo` instead of calling this method.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772759#comment-15772759
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93758297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
+  case tpe =>
--- End diff --

Same here, should support `AtomicType`.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772765#comment-15772765
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93759808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 ---
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 
 abstract class FlinkTable[T](
-val typeInfo: TypeInformation[T],
+val typeInfo: TypeInformation[_],
--- End diff --

This can be reverted when you make `TableSourceTable` supporting generic 
type.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772752#comment-15772752
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93755631
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
--- End diff --

What about use `case c: CompositeType => c.getFieldNames` instead of case 
every type?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772760#comment-15772760
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93757105
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
}
 
-   @Override
-   public TypeInformation[] getFieldTypes() {
-   return fieldTypes;
+   public int[] getFieldsIndices() {
+   return TableSource$class.getFieldsIndices(this);
--- End diff --

Why do we do like this ? It seems that we do not need to override this 
method ? 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772762#comment-15772762
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93756642
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
--- End diff --

deducted -> derived ? 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772767#comment-15772767
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93757791
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -38,7 +38,9 @@ class BatchTableSourceScan(
 
   override def deriveRowType() = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, 
tableSource.getFieldTypes)
+flinkTypeFactory.buildRowDataType(
+  tableSource.getFieldsNames,
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
--- End diff --

What if the table source return an AtomicType ? It seems that it will fail 
here.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772770#comment-15772770
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93755496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
--- End diff --

what about use `case c: CompositeType => c.getFieldNames` instead of case 
every type?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772766#comment-15772766
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93756954
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
--- End diff --

`TableSource.getFieldsNames` => `TableSource#getFieldsNames`
`TableSource.getFieldsIndices` => `TableSource#getFieldsIndices`

use `#` instead of `.` in javadoc.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772753#comment-15772753
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93758640
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 ---
@@ -44,14 +44,14 @@ abstract class FlinkTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
--- End diff --

We can use the util `getFieldTypes` to generate the TypeInformation array 
to reduce redundant code. And we can put the length check above.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772755#comment-15772755
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93756028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
--- End diff --

Could you add comment to describe what does this method do ? 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772761#comment-15772761
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93757097
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

Why do we do like this ? It seems that we do not need to override this 
method ? 


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772768#comment-15772768
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93759469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
--- End diff --

I think we should not restrict the input type is a `CompositeType`, because 
the `TableSource.getReturnType` could return any type including `AtomicType`.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772764#comment-15772764
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93758185
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
--- End diff --

We should extend here to support `AtomicType`.  See 
`UserDefinedFunctionUtil.getFieldInfo(TypeInformation)`. And I would like to 
combine these methods.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15772514#comment-15772514
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/3039

[FLINK-5280] Update TableSource to support nested data

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink nested-table-source

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3039.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3039


commit 7bd26239dd9a7c41f09fbe070baa70b19278c51f
Author: Ivan Mushketyk 
Date:   2016-12-22T21:26:34Z

[FLINK-5280] Update TableSource to support nested data




> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15771636#comment-15771636
 ] 

Jark Wu commented on FLINK-5280:


Sounds great, +1 go into 1.2 if we can catch up.

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770522#comment-15770522
 ] 

Fabian Hueske commented on FLINK-5280:
--

Maybe we can squeeze this change still into the 1.2.0 release. 
Would be good since we are changing the interface of {{TableSource}}.

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770509#comment-15770509
 ] 

Fabian Hueske commented on FLINK-5280:
--

Great, I just merged FLINK-5348 :-)

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770498#comment-15770498
 ] 

Ivan Mushketyk commented on FLINK-5280:
---

Hi guys,

I like your idea, Fabian, I'll start working on it and see how it goes.

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770065#comment-15770065
 ] 

Fabian Hueske commented on FLINK-5280:
--

It seems that these util methods have been implemented a couple of times. 

As I said, there is also {{TableEnvironment.getFieldInto()}} for names and 
indicies and {{FlinkTable}} for the types. Would be good to have all of that in 
one place.

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15770018#comment-15770018
 ] 

Jark Wu commented on FLINK-5280:


Hi guys,

It is a very good extension of the discussion. It seems that I'm late for the 
discussion :). I will still post my ideas below.

I think {{getFieldTypes()}} and {{getNumberOfFields()}} can be derived from 
{{getReturnType()}} all the time. So I would like to move them into util class, 
it will make the interface clean. Actually, we already have one called 
{{UserDefinedFunctionUtil#getFieldInfo(TypeInformation)}} and 
{{TableEnvironment.getFieldInfo(TypeInformation)}} which returns field types 
and names and indicies. We can refactor them and move them to a better place 
and maybe split variant (i.e. {{getFieldNames}}, {{getFieldIndicies}}, 
{{getFieldTypes}}).

And provide the default implementation of {{getFieldNames}} and 
{{getFieldIndicies}} based on the util.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15769949#comment-15769949
 ] 

Fabian Hueske commented on FLINK-5280:
--

How about the following:

* We make the {{TableEnvironment.getFieldInfo()}} methods statically 
accessible, i.e., we move them in the TableEnvironment companion object and 
call this method for the default implementation of 
{{TableSource.getFieldNames()}} and {{TableSource.getFieldIndicies()}}. This 
way we avoid code duplication.

* We remove {{getFieldTypes()}} and {{getNumberOfFields()}}. These methods are 
in fact not necessary for the interface. Have a look at how a regular DataSet 
is converted into a Table. The field types are extracted in {{FlinkTable}}. 
{{TableSourceTable}} overrides this logic. We could simply keep the 
{{FlinkTable}} logic. 

> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >