[jira] [Comment Edited] (FLINK-6036) Let catalog support partition

2018-10-23 Thread jingzhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660447#comment-16660447
 ] 

jingzhang edited comment on FLINK-6036 at 10/23/18 11:12 AM:
-

Hi, [~xuefuz], I just submit a pr: 
[https://github.com/apache/flink/pull/6906|https://github.com/apache/flink/pull/6906.]
  . Please have a look at it. Thanks.


was (Author: jinyu.zj):
Hi, [~xuefuz], I just submit a pr: [https://github.com/apache/flink/pull/6906.] 
 Please have a look at it. Thanks.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: jingzhang
>Assignee: jingzhang
>Priority: Major
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6036) Let catalog support partition

2018-10-23 Thread jingzhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660447#comment-16660447
 ] 

jingzhang commented on FLINK-6036:
--

Hi, [~xuefuz], I just submit a pr: [https://github.com/apache/flink/pull/6906.] 
 Please have a look at it. Thanks.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: jingzhang
>Assignee: jingzhang
>Priority: Major
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6036) Let catalog support partition

2018-10-22 Thread jingzhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659932#comment-16659932
 ] 

jingzhang commented on FLINK-6036:
--

Hi,[~xuefuz]. I would submit a pr soon. Thanks.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: jingzhang
>Assignee: jingzhang
>Priority: Major
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-18 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-7636:
-
Description: 
At present, there are two ways to fetch TableSource of a TableSourceScan node 
(e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
1. 
{code}
val relOptTable: RelOptTable = getTable()
val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
val tableSouce = tableSourceTable.tableSource
{code}
the result of getTable() is instance of RelOptTableImpl now, and it will not 
change after RelNode tree is built.
2. now all TableSourceScan contains a tablesource as constructor parameter, so 
we could fetch the tablesource directly later.
 
The result tableSource is different with each other by above two ways after 
apply project push(PPD) down or filter push down(FPD).  It is very confusing. 

we hope to fix the problem by introducing FlinkRelOptTable to replace 
RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
constructor. After PPD or FPD,  a new FlinkRelOptTable instance which contains 
a new TableSourceTable will be passed to TableSourceScan constructor. 

  was:
At present, there are two ways to fetch TableSource of a TableSourceScan node 
(e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
1. 
{code:scala}
val relOptTable: RelOptTable = getTable()
val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
val tableSouce = tableSourceTable.tableSource
{code}
the result of getTable() is instance of RelOptTableImpl now, and it will not 
change after RelNode tree is built.
2. now all TableSourceScan contains a tablesource as constructor parameter, so 
we could fetch the tablesource directly later.
 
The result tableSource is different with each other by above two ways if apply 
project push(PPD) down or filter push down(FPD).  It is very confusing. 

we hope to fix the problem by introducing FlinkRelOptTable to replace 
RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
constructor. After PPD or FPD,  a new FlinkRelOptTable instance which contains 
a new TableSourceTable will be passed to TableSourceScan constructor. 


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways after 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceScan 
> constructor. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-18 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-7636:
-
Description: 
At present, there are two ways to fetch TableSource of a TableSourceScan node 
(e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
1. 
{code:scala}
val relOptTable: RelOptTable = getTable()
val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
val tableSouce = tableSourceTable.tableSource
{code}
the result of getTable() is instance of RelOptTableImpl now, and it will not 
change after RelNode tree is built.
2. now all TableSourceScan contains a tablesource as constructor parameter, so 
we could fetch the tablesource directly later.
 
The result tableSource is different with each other by above two ways if apply 
project push(PPD) down or filter push down(FPD).  It is very confusing. 

we hope to fix the problem by introducing FlinkRelOptTable to replace 
RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
constructor. After PPD or FPD,  a new FlinkRelOptTable instance which contains 
a new TableSourceTable will be passed to TableSourceScan constructor. 

> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code:scala}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways if 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceScan 
> constructor. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-18 Thread jingzhang (JIRA)
jingzhang created FLINK-7636:


 Summary: Introduce Flink RelOptTable,  and remove tableSource from 
all TableSourceScan node constructor
 Key: FLINK-7636
 URL: https://issues.apache.org/jira/browse/FLINK-7636
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: jingzhang
Assignee: jingzhang






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-03-19 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang closed FLINK-5568.


> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier  | TableIdentifier | dbName and tableName 
> of table |
> |  tableType | String | type of external catalog table, 
> e.g csv, hbase, kafka
> |  schema| DataSchema|  schema of table data, 
> including column names and column types
> | partitionColumnNames | List | names of partition column
> | properties  | Map |properties of 
> external catalog table
> | stats   | TableStats | statistics of external 
> catalog table 
> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
> {{TableSource}} with the annotation means it is compatible with external 
> catalog, that is, it could be converted to or from ExternalCatalogTable. This 
> annotation specifies the tabletype and converter of the tableSource. For 
> example, for {{CsvTableSource}}, it specifies the tableType is csv and 
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = 
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
> converter based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-03-19 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang resolved FLINK-5568.
--
Resolution: Fixed

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier  | TableIdentifier | dbName and tableName 
> of table |
> |  tableType | String | type of external catalog table, 
> e.g csv, hbase, kafka
> |  schema| DataSchema|  schema of table data, 
> including column names and column types
> | partitionColumnNames | List | names of partition column
> | properties  | Map |properties of 
> external catalog table
> | stats   | TableStats | statistics of external 
> catalog table 
> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
> {{TableSource}} with the annotation means it is compatible with external 
> catalog, that is, it could be converted to or from ExternalCatalogTable. This 
> annotation specifies the tabletype and converter of the tableSource. For 
> example, for {{CsvTableSource}}, it specifies the tableType is csv and 
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = 
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
> converter based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6067) DataSetCalc which contains filterCondition and projects would not be choose as best path in Batch TableAPI/SQL

2017-03-19 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang closed FLINK-6067.

Resolution: Fixed

> DataSetCalc which contains filterCondition and projects would not be choose 
> as best path in Batch TableAPI/SQL
> --
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
>   DataSetCalc(select=[a, b, c], where=[<(a, 60)])
> DataSetScan(table=[[_DataSetTable_0]])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 
> 60)])
>   DataStreamScan(table=[[_DataStreamTable_0]])
> {code}
> we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
> filterCondition and projects would not be choose as best path.
> The reason is:
> The cumulative cost of topNode of path DataSetCalc->DataSetCalc->DataSetScan 
> is DataSetCost{2001.0 rows, 2004.0 cpu, 0.0 io}
> The cumulative cost of topNode of  path DataSetCalc->DataSetScan is 
> DataSetCost{2000.0 rows, 6000.0 cpu, 0.0 io}
> based on isLe method of DataSetCost, compare io first, then cpu, then rows, 
> So DataSetCalc->DataSetCalc->DataSetScan is choose as best path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6067) DataSetCalc which contains filterCondition and projects would not be choose as best path in Batch TableAPI/SQL

2017-03-19 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931593#comment-15931593
 ] 

jingzhang commented on FLINK-6067:
--

[~ykt836], it works, thanks a lot. I would close this jira.

> DataSetCalc which contains filterCondition and projects would not be choose 
> as best path in Batch TableAPI/SQL
> --
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
>   DataSetCalc(select=[a, b, c], where=[<(a, 60)])
> DataSetScan(table=[[_DataSetTable_0]])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 
> 60)])
>   DataStreamScan(table=[[_DataStreamTable_0]])
> {code}
> we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
> filterCondition and projects would not be choose as best path.
> The reason is:
> The cumulative cost of topNode of path DataSetCalc->DataSetCalc->DataSetScan 
> is DataSetCost{2001.0 rows, 2004.0 cpu, 0.0 io}
> The cumulative cost of topNode of  path DataSetCalc->DataSetScan is 
> DataSetCost{2000.0 rows, 6000.0 cpu, 0.0 io}
> based on isLe method of DataSetCost, compare io first, then cpu, then rows, 
> So DataSetCalc->DataSetCalc->DataSetScan is choose as best path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) DataSetCalc which contains filterCondition and projects would not be choose as best path in Batch TableAPI/SQL

2017-03-17 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Description: 
{code}
val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
  DataSetCalc(select=[a, b, c], where=[<(a, 60)])
DataSetScan(table=[[_DataSetTable_0]])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 60)])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
filterCondition and projects would not be choose as best path.

The reason is:
The cumulative cost of topNode of path DataSetCalc->DataSetCalc->DataSetScan is 
DataSetCost{2001.0 rows, 2004.0 cpu, 0.0 io}
The cumulative cost of topNode of  path DataSetCalc->DataSetScan is 
DataSetCost{2000.0 rows, 6000.0 cpu, 0.0 io}
based on isLe method of DataSetCost, compare io first, then cpu, then rows, So 
DataSetCalc->DataSetCalc->DataSetScan is choose as best path.


  was:
{code}
val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
  DataSetCalc(select=[a, b, c], where=[<(a, 60)])
DataSetScan(table=[[_DataSetTable_0]])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 60)])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
filterCondition and projectCondition would not be choose as best path.


> DataSetCalc which contains filterCondition and projects would not be choose 
> as best path in Batch TableAPI/SQL
> --
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
>   DataSetCalc(select=[a, b, c], where=[<(a, 60)])
> DataSetScan(table=[[_DataSetTable_0]])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 
> 60)])
>   DataStreamScan(table=[[_DataStreamTable_0]])
> {code}
> we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
> filterCondition and projects would not be choose as best path.
> The reason is:
> The cumulative cost of topNode of path DataSetCalc->DataSetCalc->DataSetScan 
> is DataSetCost{2001.0 rows, 2004.0 cpu, 0.0 io}
> The cumulative cost of topNode of  path DataSetCalc->DataSetScan is 
> DataSetCost{2000.0 rows, 6000.0 cpu, 0.0 io}
> based on isLe method of DataSetCost, compare io first, then cpu, then rows, 
> So DataSetCalc->DataSetCalc->DataSetScan is choose as best path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL

2017-03-17 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Description: 
{code}
val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
  DataSetCalc(select=[a, b, c], where=[<(a, 60)])
DataSetScan(table=[[_DataSetTable_0]])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 60)])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
filterCondition and projectCondition would not be choose as best path.

  was:
{code}
val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
  DataSetCalc(select=[a, b, c], where=[<(a, 60)])
DataSetScan(table=[[_DataSetTable_0]])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 60)])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.


> ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL
> -
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
>   DataSetCalc(select=[a, b, c], where=[<(a, 60)])
> DataSetScan(table=[[_DataSetTable_0]])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 
> 60)])
>   DataStreamScan(table=[[_DataStreamTable_0]])
> {code}
> we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
> filterCondition and projectCondition would not be choose as best path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) DataSetCalc which contains filterCondition and projects would not be choose as best path in Batch TableAPI/SQL

2017-03-17 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Summary: DataSetCalc which contains filterCondition and projects would not 
be choose as best path in Batch TableAPI/SQL  (was: ProjectNode and FilterNode 
cannot merge in Batch TableAPI/SQL)

> DataSetCalc which contains filterCondition and projects would not be choose 
> as best path in Batch TableAPI/SQL
> --
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
>   DataSetCalc(select=[a, b, c], where=[<(a, 60)])
> DataSetScan(table=[[_DataSetTable_0]])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 
> 60)])
>   DataStreamScan(table=[[_DataStreamTable_0]])
> {code}
> we can find that in the batch tableAPI/SQL, DataSetCalc which contains 
> filterCondition and projectCondition would not be choose as best path.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL

2017-03-17 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Description: 
{code}
val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
  DataSetCalc(select=[a, b, c], where=[<(a, 60)])
DataSetScan(table=[[_DataSetTable_0]])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 60)])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.

  was:
{code}
val table1 = tEnv.scan( "tb1")
val table2 = tEnv.scan("tb2")
val result = table2
.where("d < 3")
.select('d *2, 'e, 'g.upperCase())
.unionAll(table1.select('a *2, 'b, 'c.upperCase()))
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetUnion(union=[_c0, e, _c2])
DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
DataSetCalc(select=[d, e, g], where=[<(d, 3)])
BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamUnion(union=[_c0, e, _c2])
DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.


> ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL
> -
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val result = table.where('a < 60).select('a * 1.2,  'b / 2, 'c)
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c])
>   DataSetCalc(select=[a, b, c], where=[<(a, 60)])
> DataSetScan(table=[[_DataSetTable_0]])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamCalc(select=[*(a, 1.2E0) AS _c0, /(b, 2) AS _c1, c], where=[<(a, 
> 60)])
>   DataStreamScan(table=[[_DataStreamTable_0]])
> {code}
> we can find that in the batch tableAPI, the project and filterNode don't 
> merge into a single node. However, in the Stream tableAPI, these two nodes 
> could merge into one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL

2017-03-17 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Description: 
{code}
val table1 = tEnv.scan( "tb1")
val table2 = tEnv.scan("tb2")
val result = table2
.where("d < 3")
.select('d *2, 'e, 'g.upperCase())
.unionAll(table1.select('a *2, 'b, 'c.upperCase()))
{code}
we run the above code in the Batch TableAPI/SQL, we would get the following 
optimizedPlan
{code}
DataSetUnion(union=[_c0, e, _c2])
DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
DataSetCalc(select=[d, e, g], where=[<(d, 3)])
BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}
However, we run the above code in the Stream TableAPI/SQL, we would get the 
following optimizedPlan
{code}
DataStreamUnion(union=[_c0, e, _c2])
DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.

  was:
{code}
val table1 = tEnv.scan( "tb1")
val table2 = tEnv.scan("tb2")
val result = table2
.where("d < 3")
.select('d *2, 'e, 'g.upperCase())
.unionAll(table1.select('a *2, 'b, 'c.upperCase()))
{code}
we run the above code in the BatchTableAPI, we would get the following 
optimizedPlan
{code}
DataSetUnion(union=[_c0, e, _c2])
DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
DataSetCalc(select=[d, e, g], where=[<(d, 3)])
BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}
However, we run the above code in the Stream TableAPI, we would get the 
following optimizedPlan
{code}
DataStreamUnion(union=[_c0, e, _c2])
DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.


> ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL
> -
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val table1 = tEnv.scan( "tb1")
> val table2 = tEnv.scan("tb2")
> val result = table2
> .where("d < 3")
> .select('d *2, 'e, 'g.upperCase())
> .unionAll(table1.select('a *2, 'b, 'c.upperCase()))
> {code}
> we run the above code in the Batch TableAPI/SQL, we would get the following 
> optimizedPlan
> {code}
> DataSetUnion(union=[_c0, e, _c2])
> DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
> DataSetCalc(select=[d, e, g], where=[<(d, 3)])
> BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
> DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
> BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
> {code}
> However, we run the above code in the Stream TableAPI/SQL, we would get the 
> following optimizedPlan
> {code}
> DataStreamUnion(union=[_c0, e, _c2])
> DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
> StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
> DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
> StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
> {code}
> we can find that in the batch tableAPI, the project and filterNode don't 
> merge into a single node. However, in the Stream tableAPI, these two nodes 
> could merge into one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL

2017-03-17 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Summary: ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL  
(was: ProjectNode and FilterNode cannot merge in Batch TableAPI)

> ProjectNode and FilterNode cannot merge in Batch TableAPI/SQL
> -
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val table1 = tEnv.scan( "tb1")
> val table2 = tEnv.scan("tb2")
> val result = table2
> .where("d < 3")
> .select('d *2, 'e, 'g.upperCase())
> .unionAll(table1.select('a *2, 'b, 'c.upperCase()))
> {code}
> we run the above code in the BatchTableAPI, we would get the following 
> optimizedPlan
> {code}
> DataSetUnion(union=[_c0, e, _c2])
> DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
> DataSetCalc(select=[d, e, g], where=[<(d, 3)])
> BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
> DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
> BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
> {code}
> However, we run the above code in the Stream TableAPI, we would get the 
> following optimizedPlan
> {code}
> DataStreamUnion(union=[_c0, e, _c2])
> DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
> StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
> DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
> StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
> {code}
> we can find that in the batch tableAPI, the project and filterNode don't 
> merge into a single node. However, in the Stream tableAPI, these two nodes 
> could merge into one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6067) ProjectNode and FilterNode cannot merge in Batch TableAPI

2017-03-16 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6067:
-
Description: 
{code}
val table1 = tEnv.scan( "tb1")
val table2 = tEnv.scan("tb2")
val result = table2
.where("d < 3")
.select('d *2, 'e, 'g.upperCase())
.unionAll(table1.select('a *2, 'b, 'c.upperCase()))
{code}
we run the above code in the BatchTableAPI, we would get the following 
optimizedPlan
{code}
DataSetUnion(union=[_c0, e, _c2])
DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
DataSetCalc(select=[d, e, g], where=[<(d, 3)])
BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}
However, we run the above code in the Stream TableAPI, we would get the 
following optimizedPlan
{code}
DataStreamUnion(union=[_c0, e, _c2])
DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.

  was:
{code}
val table1 = tEnv.scan( "tb1")
val table2 = tEnv.scan("tb2")
val result = table2
.where("d < 3")
.select('d *2, 'e, 'g.upperCase())
.unionAll(table1.select('a *2, 'b, 'c.upperCase()))
{code}
we run the above code in the BatchTableAPI, we would get the following 
optimizedPlan
{code}
DataSetUnion(union=[_c0, e, _c2])
DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
DataSetCalc(select=[d, e, g], where=[<(d, 3)])
BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}
However, we run the above code in the Stream TableAPI, we would get the 
following optimizedPlan
{code}
DataStreamUnion(union=[_c0, e, _c2])
DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
StreamTableSourceScan(table=[[test, db2, tb2]], fields=[d, e, g])
DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
StreamTableSourceScan(table=[[test, db1, tb1]], fields=[a, b, c])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.


> ProjectNode and FilterNode cannot merge in Batch TableAPI
> -
>
> Key: FLINK-6067
> URL: https://issues.apache.org/jira/browse/FLINK-6067
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> {code}
> val table1 = tEnv.scan( "tb1")
> val table2 = tEnv.scan("tb2")
> val result = table2
> .where("d < 3")
> .select('d *2, 'e, 'g.upperCase())
> .unionAll(table1.select('a *2, 'b, 'c.upperCase()))
> {code}
> we run the above code in the BatchTableAPI, we would get the following 
> optimizedPlan
> {code}
> DataSetUnion(union=[_c0, e, _c2])
> DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
> DataSetCalc(select=[d, e, g], where=[<(d, 3)])
> BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
> DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
> BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
> {code}
> However, we run the above code in the Stream TableAPI, we would get the 
> following optimizedPlan
> {code}
> DataStreamUnion(union=[_c0, e, _c2])
> DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
> StreamTableSourceScan(table=[[tb2]], fields=[d, e, g])
> DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
> StreamTableSourceScan(table=[[tb1]], fields=[a, b, c])
> {code}
> we can find that in the batch tableAPI, the project and filterNode don't 
> merge into a single node. However, in the Stream tableAPI, these two nodes 
> could merge into one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6067) ProjectNode and FilterNode cannot merge in Batch TableAPI

2017-03-16 Thread jingzhang (JIRA)
jingzhang created FLINK-6067:


 Summary: ProjectNode and FilterNode cannot merge in Batch TableAPI
 Key: FLINK-6067
 URL: https://issues.apache.org/jira/browse/FLINK-6067
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: jingzhang
Assignee: jingzhang


{code}
val table1 = tEnv.scan( "tb1")
val table2 = tEnv.scan("tb2")
val result = table2
.where("d < 3")
.select('d *2, 'e, 'g.upperCase())
.unionAll(table1.select('a *2, 'b, 'c.upperCase()))
{code}
we run the above code in the BatchTableAPI, we would get the following 
optimizedPlan
{code}
DataSetUnion(union=[_c0, e, _c2])
DataSetCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2])
DataSetCalc(select=[d, e, g], where=[<(d, 3)])
BatchTableSourceScan(table=[[tb2]], fields=[d, e, g])
DataSetCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
BatchTableSourceScan(table=[[tb1]], fields=[a, b, c])
{code}
However, we run the above code in the Stream TableAPI, we would get the 
following optimizedPlan
{code}
DataStreamUnion(union=[_c0, e, _c2])
DataStreamCalc(select=[*(d, 2) AS _c0, e, UPPER(g) AS _c2], where=[<(d, 3)])
StreamTableSourceScan(table=[[test, db2, tb2]], fields=[d, e, g])
DataStreamCalc(select=[*(a, 2) AS _c0, b, UPPER(c) AS _c2])
StreamTableSourceScan(table=[[test, db1, tb1]], fields=[a, b, c])
{code}

we can find that in the batch tableAPI, the project and filterNode don't merge 
into a single node. However, in the Stream tableAPI, these two nodes could 
merge into one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6036) Let catalog support partition

2017-03-15 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925770#comment-15925770
 ] 

jingzhang commented on FLINK-6036:
--

[~fhueske], could I just add read partition methods to ExternalCatalog, and 
write partition methods to CrudExternalCatalog?

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6036:
-
Description: 
Now catalog only support CRUD at database and table level. But in some kind of 
catalog, for example for hive, we also need do CRUD operations at partition 
level. 
This issue aims to let catalog support partition.

  was:
Now catalog only support CRUD at database and table level. But in some kind of 
catalog, for example for hive, we also need do CRUD operations on partition 
level. 
This issue aims to let catalog support partition.


> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6036:
-
Description: 
Now catalog only support CRUD at database and table level. But in some kind of 
catalog, for example for hive, we also need do CRUD operations on partition 
level. 
This issue aims to let catalog support partition.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations on 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang reassigned FLINK-6036:


Assignee: jingzhang

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations on 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6037) the estimateRowCount method of DataSetCalc didn't work in SQL

2017-03-14 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924332#comment-15924332
 ] 

jingzhang commented on FLINK-6037:
--

[~fhueske], this issue is different from 
https://issues.apache.org/jira/browse/FLINK-5394, this issue only happens in 
the SQL. 
I agree there has no difference between Table API and SQL since both are 
represented the same way at the optimization layer. However, when using 
{{SqlToRelConverter}} to convert SqlNode to RelNode, the metadata provider 
would be overrided from {{FlinkDefaultRelMetadataProvider}} to 
{{DefaultRelMetadataProvider}} again because of the following code:
{code}
  val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
  val config = SqlToRelConverter.configBuilder()
.withTrimUnusedFields(false).withConvertTableAccess(false).build()
  val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl, validator, createCatalogReader, cluster, 
convertletTable, config)
{code}.
So in the optimization phase, Table API uses 
{{FlinkDefaultRelMetadataProvider}} , but SQL uses 
{{DefaultRelMetadataProvider}}.

> the estimateRowCount method of DataSetCalc didn't work in SQL
> -
>
> Key: FLINK-6037
> URL: https://issues.apache.org/jira/browse/FLINK-6037
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
> Fix For: 1.2.0
>
>
> The estimateRowCount method of DataSetCalc didn't work in the following 
> situation. 
> If I run the following code,
> {code}
> Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
> a==1 group by a");
> {code}
> the cost of every node in Optimized node tree is :
> {code}
> DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
> COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 
> 5000.0 cpu, 28000.0 io}
>   DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, 
> cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
>   DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
> cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
> {code}
> We expect the input rowcount of DataSetAggregate less than 1000, however the 
> actual input rowcount is still 1000 because the the estimateRowCount method 
> of DataSetCalc didn't work. 
> The problem is similar to the issue 
> https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.
> I find although we set metadata provider to 
> {{FlinkDefaultRelMetadataProvider}} in {{FlinkRelBuilder}}, but after run 
> {code}planner.rel(...) {code} to translate SqlNode to RelNode, the  metadata 
> provider would be overrided from {{FlinkDefaultRelMetadataProvider}} to 
> {{DefaultRelMetadataProvider}} again because of the following code:
> {code}
>   val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
>   val config = SqlToRelConverter.configBuilder()
> .withTrimUnusedFields(false).withConvertTableAccess(false).build()
>   val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
> new ViewExpanderImpl, validator, createCatalogReader, cluster, 
> convertletTable, config)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6037) the estimateRowCount method of DataSetCalc didn't work in SQL

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6037:
-
Description: 
The estimateRowCount method of DataSetCalc didn't work in the following 
situation. 
If I run the following code,

{code}
Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
a==1 group by a");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

The problem is similar to the issue 
https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.

I find although we set metadata provider to {{FlinkDefaultRelMetadataProvider}} 
in {{FlinkRelBuilder}}, but after run {code}planner.rel(...) {code} to 
translate SqlNode to RelNode, the  metadata provider would be overrided from 
{{FlinkDefaultRelMetadataProvider}} to {{DefaultRelMetadataProvider}} again 
because of the following code:
{code}
  val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
  val config = SqlToRelConverter.configBuilder()
.withTrimUnusedFields(false).withConvertTableAccess(false).build()
  val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl, validator, createCatalogReader, cluster, 
convertletTable, config)
{code}

  was:
The estimateRowCount method of DataSetCalc didn't work in the following 
situation. 
If I run the following code,

{code}
Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
a==1 group by a");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

The problem is similar to the issue 
https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.

I find although we set metadata provider to {{FlinkDefaultRelMetadataProvider}} 
in {{FlinkRelBuilder}}, but after run {code}planner.rel(...) {code} to 
translate SqlNode to RelNode, the  metadata provider would be overrided from 
{{FlinkDefaultRelMetadataProvider}} to {{DefaultRelMetadataProvider}} again.


> the estimateRowCount method of DataSetCalc didn't work in SQL
> -
>
> Key: FLINK-6037
> URL: https://issues.apache.org/jira/browse/FLINK-6037
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
> Fix For: 1.2.0
>
>
> The estimateRowCount method of DataSetCalc didn't work in the following 
> situation. 
> If I run the following code,
> {code}
> Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
> a==1 group by a");
> {code}
> the cost of every node in Optimized node tree is :
> {code}
> DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
> COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 
> 5000.0 cpu, 28000.0 io}
>   DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, 
> cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
>   DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
> cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
> {code}
> We expect the input rowcount of DataSetAggregate less than 1000, however the 
> actual input rowcount is still 1000 because the the estimateRowCount method 
> of DataSetCalc didn't work. 
> The problem is similar to the issue 
> https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.
> I find although we set metadata provider to 
> {{FlinkDefaultRelMetadataProvider}} in {{FlinkRelBuilder}}, but after run 
> {code}planner.rel(...) {code} to translate SqlNode to RelNode, the  metadata 
> provider would be overrided from 

[jira] [Updated] (FLINK-6037) the estimateRowCount method of DataSetCalc didn't work in SQL

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6037:
-
Description: 
The estimateRowCount method of DataSetCalc didn't work in the following 
situation. 
If I run the following code,

{code}
Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
a==1 group by a");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

The problem is similar to the issue 
https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.

I find although we set metadata provider to {{FlinkDefaultRelMetadataProvider}} 
in {{FlinkRelBuilder}}, but after run {code}planner.rel(...) {code} to 
translate SqlNode to RelNode, the  metadata provider would be overrided from 
{{FlinkDefaultRelMetadataProvider}} to {{DefaultRelMetadataProvider}} again.

  was:
The estimateRowCount method of DataSetCalc didn't work in the following 
situation. 
If I run the following code,

{code}
Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
a==1 group by a");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

The problem is similar to the issue 
https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.

I find although we set metadata provider to {{FlinkDefaultRelMetadataProvider}} 
in {{FlinkRelBuilder}}, but after run {code}planner.rel(...) {code} to 
translate SqlNode to RelNode, the  metadata provider would be overrided from 
{{FlinkDefaultRelMetadataProvider} to {{DefaultRelMetadataProvider}} again.


> the estimateRowCount method of DataSetCalc didn't work in SQL
> -
>
> Key: FLINK-6037
> URL: https://issues.apache.org/jira/browse/FLINK-6037
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
> Fix For: 1.2.0
>
>
> The estimateRowCount method of DataSetCalc didn't work in the following 
> situation. 
> If I run the following code,
> {code}
> Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
> a==1 group by a");
> {code}
> the cost of every node in Optimized node tree is :
> {code}
> DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
> COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 
> 5000.0 cpu, 28000.0 io}
>   DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, 
> cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
>   DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
> cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
> {code}
> We expect the input rowcount of DataSetAggregate less than 1000, however the 
> actual input rowcount is still 1000 because the the estimateRowCount method 
> of DataSetCalc didn't work. 
> The problem is similar to the issue 
> https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.
> I find although we set metadata provider to 
> {{FlinkDefaultRelMetadataProvider}} in {{FlinkRelBuilder}}, but after run 
> {code}planner.rel(...) {code} to translate SqlNode to RelNode, the  metadata 
> provider would be overrided from {{FlinkDefaultRelMetadataProvider}} to 
> {{DefaultRelMetadataProvider}} again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6037) the estimateRowCount method of DataSetCalc didn't work in SQL

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6037:
-
Description: 
The estimateRowCount method of DataSetCalc didn't work in the following 
situation. 
If I run the following code,

{code}
Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
a==1 group by a");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

The problem is similar to the issue 
https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.

I find although we set metadata provider to {{FlinkDefaultRelMetadataProvider}} 
in {{FlinkRelBuilder}}, but after run {code}planner.rel(...) {code} to 
translate SqlNode to RelNode, the  metadata provider would be overrided from 
{{FlinkDefaultRelMetadataProvider} to {{DefaultRelMetadataProvider}} again.

> the estimateRowCount method of DataSetCalc didn't work in SQL
> -
>
> Key: FLINK-6037
> URL: https://issues.apache.org/jira/browse/FLINK-6037
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
> Fix For: 1.2.0
>
>
> The estimateRowCount method of DataSetCalc didn't work in the following 
> situation. 
> If I run the following code,
> {code}
> Table table = tableEnv.sql("select a, avg(a), sum(b), count(c) from t1 where 
> a==1 group by a");
> {code}
> the cost of every node in Optimized node tree is :
> {code}
> DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
> COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 
> 5000.0 cpu, 28000.0 io}
>   DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, 
> cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
>   DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
> cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
> {code}
> We expect the input rowcount of DataSetAggregate less than 1000, however the 
> actual input rowcount is still 1000 because the the estimateRowCount method 
> of DataSetCalc didn't work. 
> The problem is similar to the issue 
> https://issues.apache.org/jira/browse/FLINK-5394 which is already solved.
> I find although we set metadata provider to 
> {{FlinkDefaultRelMetadataProvider}} in {{FlinkRelBuilder}}, but after run 
> {code}planner.rel(...) {code} to translate SqlNode to RelNode, the  metadata 
> provider would be overrided from {{FlinkDefaultRelMetadataProvider} to 
> {{DefaultRelMetadataProvider}} again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5394) the estimateRowCount method of DataSetCalc didn't work in TableAPI

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5394:
-
Description: 
The estimateRowCount method of DataSetCalc didn't work now. 
If I run the following code,

{code}
Table table = tableEnv
  .fromDataSet(data, "a, b, c")
  .where("a == 1")
  .groupBy("a")
  .select("a, a.avg, b.sum, c.count");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

There are two reasons caused to this:
1. Didn't provide custom metadataProvider yet. So when DataSetAggregate calls 
RelMetadataQuery.getRowCount(DataSetCalc) to estimate its input rowcount which 
would dispatch to RelMdRowCount.
2. DataSetCalc is subclass of SingleRel. So previous function call would match 
getRowCount(SingleRel rel, RelMetadataQuery mq) which would never use 
DataSetCalc.estimateRowCount.

The question would also appear to all Flink RelNodes which are subclass of 
SingleRel.

I plan to resolve this problem by adding a FlinkRelMdRowCount which contains 
specific getRowCount of Flink RelNodes.

  was:
The estimateRowCount method of DataSetCalc didn't work now. 
If I run the following code,

{code}
Table table = tableEnv
  .fromDataSet(data, "a, b, c")
  .groupBy("a")
  .select("a, a.avg, b.sum, c.count")
  .where("a == 1");
{code}

the cost of every node in Optimized node tree is :

{code}
DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 5000.0 
cpu, 28000.0 io}
  DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, cumulative 
cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
  DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
{code}

We expect the input rowcount of DataSetAggregate less than 1000, however the 
actual input rowcount is still 1000 because the the estimateRowCount method of 
DataSetCalc didn't work. 

There are two reasons caused to this:
1. Didn't provide custom metadataProvider yet. So when DataSetAggregate calls 
RelMetadataQuery.getRowCount(DataSetCalc) to estimate its input rowcount which 
would dispatch to RelMdRowCount.
2. DataSetCalc is subclass of SingleRel. So previous function call would match 
getRowCount(SingleRel rel, RelMetadataQuery mq) which would never use 
DataSetCalc.estimateRowCount.

The question would also appear to all Flink RelNodes which are subclass of 
SingleRel.

I plan to resolve this problem by adding a FlinkRelMdRowCount which contains 
specific getRowCount of Flink RelNodes.


> the estimateRowCount method of DataSetCalc didn't work in TableAPI
> --
>
> Key: FLINK-5394
> URL: https://issues.apache.org/jira/browse/FLINK-5394
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
> Fix For: 1.2.0
>
>
> The estimateRowCount method of DataSetCalc didn't work now. 
> If I run the following code,
> {code}
> Table table = tableEnv
>   .fromDataSet(data, "a, b, c")
>   .where("a == 1")
>   .groupBy("a")
>   .select("a, a.avg, b.sum, c.count");
> {code}
> the cost of every node in Optimized node tree is :
> {code}
> DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
> COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 
> 5000.0 cpu, 28000.0 io}
>   DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, 
> cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
>   DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
> cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
> {code}
> We expect the input rowcount of DataSetAggregate less than 1000, however the 
> actual input rowcount is still 1000 because the the estimateRowCount method 
> of DataSetCalc didn't work. 
> There are two reasons caused to this:
> 1. Didn't provide custom metadataProvider yet. So when DataSetAggregate calls 
> RelMetadataQuery.getRowCount(DataSetCalc) to estimate its input rowcount 
> which would dispatch to RelMdRowCount.
> 2. DataSetCalc is subclass of SingleRel. So previous function call would 
> match getRowCount(SingleRel rel, RelMetadataQuery 

[jira] [Updated] (FLINK-5394) the estimateRowCount method of DataSetCalc didn't work in TableAPI

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5394:
-
Summary: the estimateRowCount method of DataSetCalc didn't work in TableAPI 
 (was: the estimateRowCount method of DataSetCalc didn't work)

> the estimateRowCount method of DataSetCalc didn't work in TableAPI
> --
>
> Key: FLINK-5394
> URL: https://issues.apache.org/jira/browse/FLINK-5394
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
> Fix For: 1.2.0
>
>
> The estimateRowCount method of DataSetCalc didn't work now. 
> If I run the following code,
> {code}
> Table table = tableEnv
>   .fromDataSet(data, "a, b, c")
>   .groupBy("a")
>   .select("a, a.avg, b.sum, c.count")
>   .where("a == 1");
> {code}
> the cost of every node in Optimized node tree is :
> {code}
> DataSetAggregate(groupBy=[a], select=[a, AVG(a) AS TMP_0, SUM(b) AS TMP_1, 
> COUNT(c) AS TMP_2]): rowcount = 1000.0, cumulative cost = {3000.0 rows, 
> 5000.0 cpu, 28000.0 io}
>   DataSetCalc(select=[a, b, c], where=[=(a, 1)]): rowcount = 1000.0, 
> cumulative cost = {2000.0 rows, 2000.0 cpu, 0.0 io}
>   DataSetScan(table=[[_DataSetTable_0]]): rowcount = 1000.0, cumulative 
> cost = {1000.0 rows, 1000.0 cpu, 0.0 io}
> {code}
> We expect the input rowcount of DataSetAggregate less than 1000, however the 
> actual input rowcount is still 1000 because the the estimateRowCount method 
> of DataSetCalc didn't work. 
> There are two reasons caused to this:
> 1. Didn't provide custom metadataProvider yet. So when DataSetAggregate calls 
> RelMetadataQuery.getRowCount(DataSetCalc) to estimate its input rowcount 
> which would dispatch to RelMdRowCount.
> 2. DataSetCalc is subclass of SingleRel. So previous function call would 
> match getRowCount(SingleRel rel, RelMetadataQuery mq) which would never use 
> DataSetCalc.estimateRowCount.
> The question would also appear to all Flink RelNodes which are subclass of 
> SingleRel.
> I plan to resolve this problem by adding a FlinkRelMdRowCount which contains 
> specific getRowCount of Flink RelNodes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6037) the estimateRowCount method of DataSetCalc didn't work in SQL

2017-03-14 Thread jingzhang (JIRA)
jingzhang created FLINK-6037:


 Summary: the estimateRowCount method of DataSetCalc didn't work in 
SQL
 Key: FLINK-6037
 URL: https://issues.apache.org/jira/browse/FLINK-6037
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: jingzhang
Assignee: jingzhang






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)
jingzhang created FLINK-6036:


 Summary: Let catalog support partition
 Key: FLINK-6036
 URL: https://issues.apache.org/jira/browse/FLINK-6036
 Project: Flink
  Issue Type: Sub-task
Reporter: jingzhang






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5570) Support register external catalog to table environment

2017-02-23 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5570:
-
Description: 
This issue aims to support register one or more {{ExternalCatalog}} (which is 
referred in https://issues.apache.org/jira/browse/FLINK-5568) to 
{{TableEnvironment}}. After registration, SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables one by one to 
{{TableEnvironment}} beforehand.

We plan to add two APIs in {{TableEnvironment}}:
1. register externalCatalog
{code}
def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
Unit
{code}
2. scan a table from registered catalog and returns the resulting {{Table}},  
the API is very useful in TableAPI queries.
{code}
def scan(catalogName: String, tableIdentifier: TableIdentifier): Table
{code}

> Support register external catalog to table environment
> --
>
> Key: FLINK-5570
> URL: https://issues.apache.org/jira/browse/FLINK-5570
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> This issue aims to support register one or more {{ExternalCatalog}} (which is 
> referred in https://issues.apache.org/jira/browse/FLINK-5568) to 
> {{TableEnvironment}}. After registration, SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables one 
> by one to {{TableEnvironment}} beforehand.
> We plan to add two APIs in {{TableEnvironment}}:
> 1. register externalCatalog
> {code}
> def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
> Unit
> {code}
> 2. scan a table from registered catalog and returns the resulting {{Table}},  
> the API is very useful in TableAPI queries.
> {code}
> def scan(catalogName: String, tableIdentifier: TableIdentifier): Table
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-23 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880229#comment-15880229
 ] 

jingzhang edited comment on FLINK-5568 at 2/23/17 10:30 AM:


[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external 
catalog. 
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} 
is the table of Calcite Catalog because it extends to Calcite Table). But 
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite 
schema would first  delegate its underlying externalCatalog to look up the 
{{ExternalCatalogTable}} instance , then calcite schema returns a 
TableSourceTable which holds the TableSource that are generated by the 
converter from the {{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema 
class. In fact, we prefer to use Flink's representation, The DataSchema mode is 
as following:
{code}
   case class DataSchema(
   columnTypes: Array[TypeInformation[_]],
   columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is 
annotated with {{@ExternalCatalogCompatible}}.  We plan to depends on configure 
file.
 * let each connector specifies the scan packages in appointed configure 
file. (So if there is no such configure file in a connector module, we would 
not try to scan the {{TableSource}} from this module)
 * try to look up all the resources with the given name of classloader , 
and parse the scan-packages fields. 

Looking forward to your advices, thanks.


was (Author: jinyu.zj):
[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external 
catalog. 
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} 
is the table of Calcite Catalog because it extends to Calcite Table). But 
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite 
schema would first  delegate its underlying externalCatalog to look up the 
{{ExternalCatalogTable}} instance , then calcite schema returns a 
TableSourceTable which holds the TableSource that are generated by the 
converter from the {{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema 
class. In fact, we prefer to use Flink's representation, The DataSchema mode is 
as following:
{code}
   case class DataSchema(
   columnTypes: Array[TypeInformation[_]],
   columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is 
annotated with {{@ExternalCatalogCompatible}}.  We plan to depends on configure 
file.
 * let each connector specifies the scan packages in appointed configure 
file. 
 * try to look up all the resources with the given name of classloader , 
and parse the scan-packages fields. 

Looking forward to your advices, thanks.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter 

[jira] [Comment Edited] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-23 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880229#comment-15880229
 ] 

jingzhang edited comment on FLINK-5568 at 2/23/17 10:28 AM:


[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external 
catalog. 
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} 
is the table of Calcite Catalog because it extends to Calcite Table). But 
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite 
schema would first  delegate its underlying externalCatalog to look up the 
{{ExternalCatalogTable}} instance , then calcite schema returns a 
TableSourceTable which holds the TableSource that are generated by the 
converter from the {{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema 
class. In fact, we prefer to use Flink's representation, The DataSchema mode is 
as following:
{code}
   case class DataSchema(
   columnTypes: Array[TypeInformation[_]],
   columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is 
annotated with {{@ExternalCatalogCompatible}}.  We plan to depends on configure 
file.
 * let each connector specifies the scan packages in appointed configure 
file. 
 * try to look up all the resources with the given name of classloader , 
and parse the scan-packages fields. 

Looking forward to your advices, thanks.


was (Author: jinyu.zj):
[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external 
catalog. 
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} 
is the table of Calcite Catalog because it extends to Calcite Table). But 
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite 
schema would first look up the {{ExternalCatalogTable}} instance from the 
underlying externalCatalog, then return a TableSourceTable which holds the 
TableSource that are generated by the converter from the 
{{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema 
class. In fact, we prefer to use Flink's representation, The DataSchema mode is 
as following:
{code}
   case class DataSchema(
   columnTypes: Array[TypeInformation[_]],
   columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is 
annotated with {{@ExternalCatalogCompatible}}.  We plan to depends on configure 
file.
 * let each connector specifies the scan packages in appointed configure 
file. 
 * try to look up all the resources with the given name of classloader , 
and parse the scan-packages fields. 

Looking forward to your advices, thanks.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for 

[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-23 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880229#comment-15880229
 ] 

jingzhang commented on FLINK-5568:
--

[~fhueske], thanks for your advices. 
Here is my thoughts on your questions,  looking forward to your opinions.
1. {{ExternalCatalogTable}} is table definition or description of the external 
catalog. 
{{ExternalCatalogTable}} does not extend to {{FlinkTable}}. ({{FlinkTable}} 
is the table of Calcite Catalog because it extends to Calcite Table). But 
{{ExternalCatalogTable}} is the table of External Catalog.
When {{CalciteCatalogReader}} look up a table from Calcite catalog, Calcite 
schema would first look up the {{ExternalCatalogTable}} instance from the 
underlying externalCatalog, then return a TableSourceTable which holds the 
TableSource that are generated by the converter from the 
{{ExternalCatalogTable}}.
2. Yes, it's better to move {{partitionColumnNames}} into {{properties}}.
3.  It's my bad to said unclearly. We don't want to implement a new Schema 
class. In fact, we prefer to use Flink's representation, The DataSchema mode is 
as following:
{code}
   case class DataSchema(
   columnTypes: Array[TypeInformation[_]],
   columnNames: Array[String])
{code}
4. It is important to know where to scan these {{TableSource}} that is 
annotated with {{@ExternalCatalogCompatible}}.  We plan to depends on configure 
file.
 * let each connector specifies the scan packages in appointed configure 
file. 
 * try to look up all the resources with the given name of classloader , 
and parse the scan-packages fields. 

Looking forward to your advices, thanks.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> Here is the design mode of ExternalCatalogTable.
> |  identifier  | TableIdentifier | dbName and tableName 
> of table |
> |  tableType | String | type of external catalog table, 
> e.g csv, hbase, kafka
> |  schema| DataSchema|  schema of table data, 
> including column names and column types
> | partitionColumnNames | List | names of partition column
> | properties  | Map |properties of 
> external catalog table
> | stats   | TableStats | statistics of external 
> catalog table 
> | comment | String | 
> | create time | long
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.

Here is the design mode of ExternalCatalogTable.

|  identifier  | TableIdentifier | dbName and tableName of 
table |
|  tableType | String | type of external catalog table, e.g 
csv, hbase, kafka
|  schema| DataSchema|  schema of table data, including 
column names and column types
| partitionColumnNames | List | names of partition column
| properties  | Map |properties of external 
catalog table
| stats   | TableStats | statistics of external 
catalog table 
| comment | String | 
| create time | long

There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is one solution. Let {{TableSource}} specify a converter.
1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
{{TableSource}} with the annotation means it is compatible with external 
catalog, that is, it could be converted to or from ExternalCatalogTable. This 
annotation specifies the tabletype and converter of the tableSource. For 
example, for {{CsvTableSource}}, it specifies the tableType is csv and 
converter class is CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite 

[jira] [Commented] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879755#comment-15879755
 ] 

jingzhang commented on FLINK-5568:
--

[~fhueske], thanks for your response. There is still a detail problem need to 
discuss, that is, how to convert ExternalCatalogTable to TableSourceTable. I 
add the question and one solution in the description. Looking forward to your 
advices.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables (only 
> support {{TableSourceTable}}).
> 3. register external catalog to {{TableEnvironment}}.
> There is still a detail problem need to be take into consideration, that is , 
> how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The 
> question is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} 
> because we could  easily get {{TableSourceTable}} from {{TableSource}}.
> Because different {{TableSource}} often contains different fields to initiate 
> an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
> fieldDelim, rowDelim and so on to create a new instance , 
> {{KafkaTableSource}} needs configuration and tableName to create a new 
> instance. So it's not a good idea to let Flink framework be responsible for 
> translate  {{ExternalCatalogTable}} to different kind of 
> {{TableSourceTable}}. 
> Here is one solution. Let {{TableSource}} specify a converter.
> 1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
> {{TableSource}} with the annotation means it is compatible with external 
> catalog, that is, it could be converted to or from ExternalCatalogTable. This 
> annotation specifies the tabletype and converter of the tableSource. For 
> example, for {{CsvTableSource}}, it specifies the tableType is csv and 
> converter class is CsvTableSourceConverter.
> {code}
> @ExternalCatalogCompatible(tableType = "csv", converter = 
> classOf[CsvTableSourceConverter])
> class CsvTableSource(...) {
> ...}
> {code}
> 2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
> the tableType and converter in a Map
> 3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
> converter based on tableType. and let converter do convert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is one solution. Let {{TableSource}} specify a converter.
1. provide  an Annatition named {{ExternalCatalogCompatible}}. The 
{{TableSource}} with the annotation means it is compatible with external 
catalog, that is, it could be converted to or from ExternalCatalogTable. This 
annotation specifies the tabletype and converter of the tableSource. For 
example, for {{CsvTableSource}}, it specifies the tableType is csv and 
converter class is CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem 

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is one solution. Let {{TableSource}} specify a converter.
1. provide  an Annatition named ExternalCatalogCompatible. The {{TableSource}} 
with the annotation means it is compatible with external catalog, that is, it 
could be converted to or from ExternalCatalogTable. This annotation specifies 
the tabletype and converter of the tableSource. For example, for 
{{CsvTableSource}}, it specifies the tableType is csv and converter class is 
CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem 

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
{{TableSourceTable}}).
3. register external catalog to {{TableEnvironment}}.


There is still a detail problem need to be take into consideration, that is , 
how to convert  {{ExternalCatalogTable}} to {{TableSourceTable}}. The question 
is equals to  convert  {{ExternalCatalogTable}} to {{TableSource}} because we 
could  easily get {{TableSourceTable}} from {{TableSource}}.

Because different {{TableSource}} often contains different fields to initiate 
an instance. E.g. {{CsvTableSource}}  needs path, fieldName, fieldTypes, 
fieldDelim, rowDelim and so on to create a new instance , {{KafkaTableSource}} 
needs configuration and tableName to create a new instance. So it's not a good 
idea to let Flink framework be responsible for translate  
{{ExternalCatalogTable}} to different kind of {{TableSourceTable}}. 

Here is my thought. Let {{TableSource}} specify a converter.
1. provide  an Annatition named ExternalCatalogCompatible. The {{TableSource}} 
with the annotation means it is compatible with external catalog, that is, it 
could be converted to or from ExternalCatalogTable. This annotation specifies 
the tabletype and converter of the tableSource. For example, for 
{{CsvTableSource}}, it specifies the tableType is csv and converter class is 
CsvTableSourceConverter.
{code}
@ExternalCatalogCompatible(tableType = "csv", converter = 
classOf[CsvTableSourceConverter])
class CsvTableSource(...) {
...}
{code}
2. Scan all TableSources with the ExternalCatalogCompatible annotation, save 
the tableType and converter in a Map
3. When need to convert {{ExternalCatalogTable}} to {{TableSource}} , get the 
converter based on tableType. and let converter do convert

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
TableSourceTable).
3. register external catalog to {{TableEnvironment}}.


> Introduce interface for catalog, and 

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to those temp tables. Now DatasetTable,  DataStreamTable and 
TableSourceTable can be registered to  {{TableEnvironment}} as temporary tables.

This issue wants to provides a mechanism to connect external catalogs such as 
HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables to 
{{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables (only support 
TableSourceTable).
3. register external catalog to {{TableEnvironment}}.

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to thoese temp tables.

This issue wants to provides a mechanism to connector external catalogs, so SQL 
and TableAPI queries could access to tables in the external catalogs without 
register those tables to {{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables.
3. register external catalog to {{TableEnvironment}}.


> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to those temp tables. Now DatasetTable,  DataStreamTable 
> and TableSourceTable can be registered to  {{TableEnvironment}} as temporary 
> tables.
> This issue wants to provides a mechanism to connect external catalogs such as 
> HCatalog to the {{TableEnvironment}}, so SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables to 
> {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can 

[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to thoese temp tables.

This issue wants to provides a mechanism to connector external catalogs, so SQL 
and TableAPI queries could access to tables in the external catalogs without 
register those tables to {{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables.
3. register external catalog to {{TableEnvironment}}.

  was:
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to thoese temp tables.

This issue wants to provides an mechanism to connector external catalogs, so 
SQL and TableAPI queries could access to tables in the external catalogs 
without register those tables to {{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables.
3. register external catalog to {{TableEnvironment}}.


> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to thoese temp tables.
> This issue wants to provides a mechanism to connector external catalogs, so 
> SQL and TableAPI queries could access to tables in the external catalogs 
> without register those tables to {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables.
> 3. register external catalog to {{TableEnvironment}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {{TableEnvironment}} now provides a mechanism to register temporary table. 
It registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to thoese temp tables.

This issue wants to provides an mechanism to connector external catalogs, so 
SQL and TableAPI queries could access to tables in the external catalogs 
without register those tables to {{TableEnvironment}} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables.
3. register external catalog to {{TableEnvironment}}.

  was:
The {TableEnvironment} now provides a mechanism to register temporary table. It 
registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to thoese temp tables.

This issue wants to provides an mechanism to connector external catalogs, so 
SQL and TableAPI queries could access to tables in the external catalogs 
without register those tables to {TableEnvironment} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables.
3. register external catalog to {TableEnvironment}.


> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {{TableEnvironment}} now provides a mechanism to register temporary 
> table. It registers the temp table to calcite catalog, so SQL and TableAPI 
> queries can access to thoese temp tables.
> This issue wants to provides an mechanism to connector external catalogs, so 
> SQL and TableAPI queries could access to tables in the external catalogs 
> without register those tables to {{TableEnvironment}} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables.
> 3. register external catalog to {{TableEnvironment}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, and integrate with calcite schema

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Description: 
The {TableEnvironment} now provides a mechanism to register temporary table. It 
registers the temp table to calcite catalog, so SQL and TableAPI queries can 
access to thoese temp tables.

This issue wants to provides an mechanism to connector external catalogs, so 
SQL and TableAPI queries could access to tables in the external catalogs 
without register those tables to {TableEnvironment} beforehand.

First, we should point out that there are two kinds of catalog in Flink 
actually. 
The first one is external catalog as we mentioned before, it provides CRUD 
operations to databases/tables.
The second one is calcite catalog, it defines namespace that can be accessed in 
Calcite queries. It depends on Calcite Schema/Table abstraction. SqlValidator 
and SqlConverter depends on the calcite catalog to fetch the tables in SQL or 
TableAPI.

So we need to do the following things:
1. introduce interface for external catalog, maybe provide an in-memory 
implementation first for test and develop environment.
2. introduce a mechanism to connect external catalog with Calcite catalog so 
the tables/databases in external catalog can be accessed in Calcite catalog. 
Including convert databases of externalCatalog to Calcite sub-schemas, convert 
tables in a database of externalCatalog to  Calcite tables.
3. register external catalog to {TableEnvironment}.

> Introduce interface for catalog, and provide an in-memory implementation, and 
> integrate with calcite schema
> ---
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> The {TableEnvironment} now provides a mechanism to register temporary table. 
> It registers the temp table to calcite catalog, so SQL and TableAPI queries 
> can access to thoese temp tables.
> This issue wants to provides an mechanism to connector external catalogs, so 
> SQL and TableAPI queries could access to tables in the external catalogs 
> without register those tables to {TableEnvironment} beforehand.
> First, we should point out that there are two kinds of catalog in Flink 
> actually. 
> The first one is external catalog as we mentioned before, it provides CRUD 
> operations to databases/tables.
> The second one is calcite catalog, it defines namespace that can be accessed 
> in Calcite queries. It depends on Calcite Schema/Table abstraction. 
> SqlValidator and SqlConverter depends on the calcite catalog to fetch the 
> tables in SQL or TableAPI.
> So we need to do the following things:
> 1. introduce interface for external catalog, maybe provide an in-memory 
> implementation first for test and develop environment.
> 2. introduce a mechanism to connect external catalog with Calcite catalog so 
> the tables/databases in external catalog can be accessed in Calcite catalog. 
> Including convert databases of externalCatalog to Calcite sub-schemas, 
> convert tables in a database of externalCatalog to  Calcite tables.
> 3. register external catalog to {TableEnvironment}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation. Integrate external catalog with calcite catalog.

2017-02-22 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5568:
-
Summary: Introduce interface for catalog, and provide an in-memory 
implementation. Integrate external catalog with calcite catalog.  (was: 
Introduce interface for catalog, and provide an in-memory implementation, 
migrate current table registration to in-memory catalog)

> Introduce interface for catalog, and provide an in-memory implementation. 
> Integrate external catalog with calcite catalog.
> --
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)