[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3409


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107722255
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +252,35 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (rootSchema.getSubSchema(name) != null) {
+  throw new ExternalCatalogAlreadyExistException(name)
+}
+this.externalCatalogs.put(name, externalCatalog)
+// create an external catalog calicte schema, register it on the root 
schema
+ExternalCatalogSchema.create(rootSchema, name, externalCatalog)
--- End diff --

rename `create` -> `registerCatalog`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107724116
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
--- End diff --

change order of parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107636567
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tablePath: String*) extends LeafNode {
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107636115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

good idea, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107347170
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

Alright, I found a solution to the problem.
We can actually define a varargs method in Scala that can be used from Java 
with varargs. 

```
import _root_.scala.annotation.varargs

@varargs def scan(tablePath: String*): Table = {
  scanInternal(tablePath.toArray)
}
```

This way we do not need to parse the string and have unified usage in Java 
and Scala.
All TableEnvironments will only have a single varargs method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107009220
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

Initially, @beyond1920 had added a `scan(catalog: String, db: String, 
table: String)` method to the root `TableEnvironment` in addition to the 
`scan(table: String)` method. 

I had suggested to just use a single varargs method, but since we have Java 
and Scala environments the varargs method needed to go into the specific 
`TableEnvironment` implementations. However, I forgot that the Java 
environments are also implemented in Scala. I assume we cannot define Java 
varargs for those. That's probably we we ended up with the parsing approach. 
Not sure if this is better than having two methods in the root 
`TableEnvironment`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107005694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
+  throw new ExternalCatalogAlreadyExistException(name)
+}
+this.externalCatalogs.put(name, externalCatalog)
+// create an external catalog calicte schema, register it on the root 
schema
+ExternalCatalogSchema.create(rootSchema, name, externalCatalog)
+  }
+
+  /**
+* Gets a registered [[ExternalCatalog]] by name
+*
+* @param name The name under which the externalCatalog was previous 
registered.
+* @return the externalCatalog found by name
+*/
+  def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
--- End diff --

I see, good point!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107009611
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -180,4 +180,31 @@ class StreamTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

That would mean that the Scala `TableEnvironment` has a varargs *and* the 
String parsing method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107103482
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tablePath: String*) extends LeafNode {
--- End diff --

I agree with @twalthr on this one. Varargs are nice syntactic sugar for 
APIs but internally, we should rather use arrays.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107061507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tablePath: String*) extends LeafNode {
--- End diff --

use arrays instead of var-args in internal classes is a convention? I could 
do it in this way, however,  var-args is more convenient to use than arrays in 
some cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107060990
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -165,3 +165,31 @@ case class AmbiguousTableSourceConverterException(
 
   def this(tableType: String) = this(tableType, null)
 }
+
+/**
+  * Exception for operation on a nonexistent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause
+  */
+case class ExternalCatalogNotExistException(
--- End diff --

hi, @twalthr , do you means that we could introduce two superclass, which 
is AlreadyExistException and NotExistException, other exceptions are subclass 
of these two exception class? IMO, there is no need to introduce a superclass, 
because each exception represents a different unexpected situation that user 
may cares, they may want to know whether database or table or something else 
does not exist yet. what's your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107059962
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

hi, @twalthr,  there already existed scan method which is `def 
scan(tableName: String)`, I added a scan method which is `def scan(catalogName: 
String, dbName: String, tableName: String)` in first commit. Fabian suggest 
that we could extend the previous scan(String) to accept varargs parameters. 
And We would need to push the implementation to the Scala / Java versions of 
BatchTableEnvironment and StreamTableEnvironment because varargs are handled 
differently by Scala and Java. In this way, we could keep the API more concise 
because we only need a single scan() method. I think it's a good idea, so I 
updated the pr in the second commit. Maybe I'm confused something there.  Any 
advice? @fhueske, @twalthr .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106920718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -165,3 +165,31 @@ case class AmbiguousTableSourceConverterException(
 
   def this(tableType: String) = this(tableType, null)
 }
+
+/**
+  * Exception for operation on a nonexistent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause
+  */
+case class ExternalCatalogNotExistException(
--- End diff --

Maybe we should introduce some superclasses, because there are many 
exceptions that users might want to catch at once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106921688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tablePath: String*) extends LeafNode {
--- End diff --

We should use arrays instead of var-args in internal classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106921052
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

Why do we have two types of scan methods (one with string and one with 
var-args)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106921358
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 ---
@@ -180,4 +180,31 @@ class StreamTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

Can we move this to the `TableEnvironment` class to remove duplicate code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r10636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -353,19 +388,48 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * The table to scan must be registered in the [[TableEnvironment]]'s 
catalog.
 *
 * @param tableName The name of the table to scan.
-* @throws ValidationException if no table is registered under the 
given name.
+* @throws TableException if no table is registered under the given 
name.
 * @return The scanned table.
 */
   @throws[ValidationException]
   def scan(tableName: String): Table = {
 if (isRegistered(tableName)) {
-  new Table(this, CatalogNode(tableName, getRowType(tableName)))
+  new Table(this, CatalogNode(getRowType(tableName), tableName))
 } else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
 }
   }
 
   /**
+* Scans a table from registered external catalog and returns the 
resulting [[Table]].
+*
+* @param catalogName The name of the catalog to look-up for the table.
+* @param dbName  The database name of the table to scan.
+* @param tableName   The table name to scan.
+* @throws ExternalCatalogNotExistException if no catalog is registered 
under the given name.
+* @throws TableException   if no database/ table is 
found in the given catalog.
+* @return The scanned table.
+*/
+  @throws[ExternalCatalogNotExistException]
+  @throws[TableException]
+  def scan(catalogName: String, dbName: String, tableName: String): Table 
= {
--- End diff --

Good suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106368629
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106368614
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
+  throw new ExternalCatalogAlreadyExistException(name)
+}
+this.externalCatalogs.put(name, externalCatalog)
+// create an external catalog calicte schema, register it on the root 
schema
+ExternalCatalogSchema.create(rootSchema, name, externalCatalog)
+  }
+
+  /**
+* Gets a registered [[ExternalCatalog]] by name
+*
+* @param name The name under which the externalCatalog was previous 
registered.
+* @return the externalCatalog found by name
+*/
+  def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
--- End diff --

We may need to look up the registered catalog to call create/update/delete 
table or database method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106368344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -99,6 +101,9 @@ abstract class TableEnvironment(val config: TableConfig) 
{
   // a counter for unique attribute names
   private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
+  // registered external catalog names -> catalog
+  private val externalCatalogs = new HashMap[String, ExternalCatalog]
--- End diff --

@fhueske , store catalog aims to get the registered catalog in the future. 
User may want to call create/update/delete table or database method of catalog 
when execute DDL in SQL or similar operation in TableAPI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106025824
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -99,6 +101,9 @@ abstract class TableEnvironment(val config: TableConfig) 
{
   // a counter for unique attribute names
   private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
+  // registered external catalog names -> catalog
+  private val externalCatalogs = new HashMap[String, ExternalCatalog]
--- End diff --

Why do we need to store the catalogs if we register them in the 
`rootSchema`?
Can't we look up the catalog names there as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106104033
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -353,19 +388,48 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * The table to scan must be registered in the [[TableEnvironment]]'s 
catalog.
 *
 * @param tableName The name of the table to scan.
-* @throws ValidationException if no table is registered under the 
given name.
+* @throws TableException if no table is registered under the given 
name.
 * @return The scanned table.
 */
   @throws[ValidationException]
   def scan(tableName: String): Table = {
 if (isRegistered(tableName)) {
-  new Table(this, CatalogNode(tableName, getRowType(tableName)))
+  new Table(this, CatalogNode(getRowType(tableName), tableName))
 } else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
 }
   }
 
   /**
+* Scans a table from registered external catalog and returns the 
resulting [[Table]].
+*
+* @param catalogName The name of the catalog to look-up for the table.
+* @param dbName  The database name of the table to scan.
+* @param tableName   The table name to scan.
+* @throws ExternalCatalogNotExistException if no catalog is registered 
under the given name.
+* @throws TableException   if no database/ table is 
found in the given catalog.
+* @return The scanned table.
+*/
+  @throws[ExternalCatalogNotExistException]
+  @throws[TableException]
+  def scan(catalogName: String, dbName: String, tableName: String): Table 
= {
--- End diff --

Do we need a dedicated method for this or should we extend the previous 
`scan(String)` to accept varargs parameters? We would need to push the 
implementation to the Scala / Java versions of `BatchTableEnvironment` and 
`StreamTableEnvironment` because varargs are handled differently by Scala and 
Java. This would keep the API more concise because we only need a single 
`scan()` method.

What do you think @beyond1920?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106025992
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
--- End diff --

This should also be checked by Calcite if we add a new schema to the 
`rootSchema`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106024978
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
+  throw new ExternalCatalogAlreadyExistException(name)
+}
+this.externalCatalogs.put(name, externalCatalog)
+// create an external catalog calicte schema, register it on the root 
schema
+ExternalCatalogSchema.create(rootSchema, name, externalCatalog)
+  }
+
+  /**
+* Gets a registered [[ExternalCatalog]] by name
+*
+* @param name The name under which the externalCatalog was previous 
registered.
+* @return the externalCatalog found by name
+*/
+  def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
--- End diff --

Should this be a public method or will it only be used internally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106026297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -353,19 +388,48 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * The table to scan must be registered in the [[TableEnvironment]]'s 
catalog.
 *
 * @param tableName The name of the table to scan.
-* @throws ValidationException if no table is registered under the 
given name.
+* @throws TableException if no table is registered under the given 
name.
 * @return The scanned table.
 */
   @throws[ValidationException]
   def scan(tableName: String): Table = {
 if (isRegistered(tableName)) {
-  new Table(this, CatalogNode(tableName, getRowType(tableName)))
+  new Table(this, CatalogNode(getRowType(tableName), tableName))
 } else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
 }
   }
 
   /**
+* Scans a table from registered external catalog and returns the 
resulting [[Table]].
+*
+* @param catalogName The name of the catalog to look-up for the table.
+* @param dbName  The database name of the table to scan.
+* @param tableName   The table name to scan.
+* @throws ExternalCatalogNotExistException if no catalog is registered 
under the given name.
+* @throws TableException   if no database/ table is 
found in the given catalog.
+* @return The scanned table.
+*/
+  @throws[ExternalCatalogNotExistException]
+  @throws[TableException]
+  def scan(catalogName: String, dbName: String, tableName: String): Table 
= {
+if (this.externalCatalogs.contains(catalogName)) {
--- End diff --

This check can also be done by the `rootSchema`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106026738
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tableName: String*) extends LeafNode {
--- End diff --

rename `tableName` to `tableQualifier` or `tablePath`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106107064
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExternalCatalogITCase.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.catalog.TableIdentifier
+import org.apache.flink.table.utils.CommonTestData
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ExternalCatalogITCase(
--- End diff --

We try to not add too many ITCases to void long build cycles. Actually, I 
think we do not need an ITCase which executes a query to cover this feature.

Instead we can use the `TableTestBase` to check that a query against an 
external catalog is correctly translated.
Since all code for the four combinations of Java/Scala and 
DataStream/DataSet is shared, a unit test which extends `TableTestBase` for on 
of these combinations (preferably with Scala) is sufficient, IMO. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-02-24 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3409

[flink-5570] [Table API & SQL]Support register external catalog to table 
environment

This pr aims to support register external catalog to TableEnvironment.
The pr contains two commits, the first one is about 
https://issues.apache.org/jira/browse/FLINK-5568, it's content is as same as 
(https://github.com/apache/flink/pull/3406).
The second commit is to support externalCatalog registration. So please 
focus on the second commit when you review this pr.

The main changes in the second commit including:
1. add registerExternalCatalog method in TableEnvironment to register 
external catalog
2. add scan method in TableEnvironment to scan the table of the external 
catalog
3. add test cases for ExternalCatalog, including registration and scan

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5570

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3409


commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476
Author: jingzhang 
Date:   2017-02-22T11:28:08Z

Introduce interface for external catalog, and provide an in-memory 
implementation for test or develop. Integrate with calcite catalog.

commit 05e2b13847fab01e330d4bf2232886a793f7dd0c
Author: jingzhang 
Date:   2017-02-24T06:10:50Z

Support register external catalog to table environment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---