[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3829


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139621970
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -762,13 +763,10 @@ class Table(
 * @tparam T The data type that the [[TableSink]] expects.
 */
   def writeToSink[T](sink: TableSink[T]): Unit = {
-
-def queryConfig = this.tableEnv match {
-  case s: StreamTableEnvironment => s.queryConfig
-  case b: BatchTableEnvironment => new BatchQueryConfig
-  case _ => null
+val queryConfig = Option(this.tableEnv) match {
--- End diff --

ah, I see. Makes sense. 


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139621820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -130,6 +130,72 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] with given field names and types 
in this
+* [[TableEnvironment]]'s catalog. Registered sink tables can be 
referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink and its field names and types
+* {{{
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val fieldTypes: Array[TypeInformation[_]]  = Array(Types.STRING, 
Types.INT, Types.LONG)
+*   val tableSink: TableSink = new YourTableSinkImpl(...)
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", fieldNames, 
fieldsTypes, tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param fieldNames The field names related to the [[TableSink]].
+* @param fieldTypes The field types related to the [[TableSink]].
+* @param tableSink The [[TableSink]] to register.
+*/
+  def registerTableSink(
+  name: String,
+  fieldNames: Array[String],
+  fieldTypes: Array[TypeInformation[_]],
+  tableSink: TableSink[_]): Unit = {
+checkValidTableName(name)
+if (null == fieldNames || null == fieldTypes) {
+  throw new TableException("fieldNames and fieldTypes should not be 
empty!")
+}
+if (fieldNames.length != fieldTypes.length) {
+  val errorMsg = "Field names and types should have same length! 
Passing fieldNames " +
+s"length: ${fieldNames.length} but fieldTypes length: 
${fieldTypes.length}"
+  throw new TableException(errorMsg)
+}
+
+tableSink match {
+  case streamTableSink@(_: AppendStreamTableSink[_] | _: 
UpsertStreamTableSink[_] |
+  _: RetractStreamTableSink[_]) =>
+val configuredSink = streamTableSink.configure(fieldNames, 
fieldTypes)
+registerTableInternal(name, new TableSinkTable(configuredSink))
+  case _ =>
+throw new TableException(
+  "Only AppendStreamTableSink, UpsertStreamTableSink and 
RetractStreamTableSink can be " +
+"registered in StreamTableEnvironment.")
+}
+  }
+
+  /**
+* Registers a [[Table]] under a unique name in the TableEnvironment's 
catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param name  The name under which the table will be registered.
+* @param table The table to register.
+*/
+  override def registerTable(
--- End diff --

thx for the confirmation


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139580604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -130,6 +130,72 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] with given field names and types 
in this
+* [[TableEnvironment]]'s catalog. Registered sink tables can be 
referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink and its field names and types
+* {{{
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val fieldTypes: Array[TypeInformation[_]]  = Array(Types.STRING, 
Types.INT, Types.LONG)
+*   val tableSink: TableSink = new YourTableSinkImpl(...)
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", fieldNames, 
fieldsTypes, tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param fieldNames The field names related to the [[TableSink]].
+* @param fieldTypes The field types related to the [[TableSink]].
+* @param tableSink The [[TableSink]] to register.
+*/
+  def registerTableSink(
+  name: String,
+  fieldNames: Array[String],
+  fieldTypes: Array[TypeInformation[_]],
+  tableSink: TableSink[_]): Unit = {
+checkValidTableName(name)
+if (null == fieldNames || null == fieldTypes) {
+  throw new TableException("fieldNames and fieldTypes should not be 
empty!")
+}
+if (fieldNames.length != fieldTypes.length) {
+  val errorMsg = "Field names and types should have same length! 
Passing fieldNames " +
+s"length: ${fieldNames.length} but fieldTypes length: 
${fieldTypes.length}"
+  throw new TableException(errorMsg)
+}
+
+tableSink match {
+  case streamTableSink@(_: AppendStreamTableSink[_] | _: 
UpsertStreamTableSink[_] |
+  _: RetractStreamTableSink[_]) =>
+val configuredSink = streamTableSink.configure(fieldNames, 
fieldTypes)
+registerTableInternal(name, new TableSinkTable(configuredSink))
+  case _ =>
+throw new TableException(
+  "Only AppendStreamTableSink, UpsertStreamTableSink and 
RetractStreamTableSink can be " +
+"registered in StreamTableEnvironment.")
+}
+  }
+
+  /**
+* Registers a [[Table]] under a unique name in the TableEnvironment's 
catalog.
+* Registered tables can be referenced in SQL queries.
+*
+* @param name  The name under which the table will be registered.
+* @param table The table to register.
+*/
+  override def registerTable(
--- End diff --

Seems unexpected auto generated by idea, do not need an overriding here.


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139582270
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -762,13 +763,10 @@ class Table(
 * @tparam T The data type that the [[TableSink]] expects.
 */
   def writeToSink[T](sink: TableSink[T]): Unit = {
-
-def queryConfig = this.tableEnv match {
-  case s: StreamTableEnvironment => s.queryConfig
-  case b: BatchTableEnvironment => new BatchQueryConfig
-  case _ => null
+val queryConfig = Option(this.tableEnv) match {
--- End diff --

I did think so but encounter a NPE when test 
`CorrelateValidationTest.testInvalidTableFunction`
```
 // table function call writeToSink
expectExceptionThrown(
  func1('c).writeToSink(null),
  "Cannot translate a query with an unbounded table function call."
)
```
 so I add the `Option` here.


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139525613
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,23 +521,130 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param query The SQL query to evaluate.
 * @return The result of the query as Table.
 */
+  @deprecated("use [[sqlQuery()]] instead")
   def sql(query: String): Table = {
+sqlQuery(query)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param query The SQL query to evaluate.
+* @return The result of the query as Table
+*/
+  def sqlQuery(query: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported SQL query! sqlQuery() only accepts SQL queries of 
type SELECT, UNION, " +
+  "INTERSECT, EXCEPT, VALUES, WITH, ORDER_BY, and EXPLICIT_TABLE.")
+}
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name except table sink table(Table sink tables are not 
automatically registered via
+* [[Table.toString]]). So it allows to call SQL directly on tables 
like this:
+*
+* {{{
+*   // should register the table sink which will be inserted into
+*   tEnv.registerTableSink("target_table", fieldNames, fieldsTypes, 
tableSink)
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param stmt The SQL statement to evaluate.
+*/
+  def sqlUpdate(stmt: String): Unit = {
+sqlUpdate(stmt, this.queryConfig)
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
--- End diff --

same as above


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139525263
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,23 +521,130 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param query The SQL query to evaluate.
 * @return The result of the query as Table.
 */
+  @deprecated("use [[sqlQuery()]] instead")
   def sql(query: String): Table = {
+sqlQuery(query)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param query The SQL query to evaluate.
+* @return The result of the query as Table
+*/
+  def sqlQuery(query: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported SQL query! sqlQuery() only accepts SQL queries of 
type SELECT, UNION, " +
+  "INTERSECT, EXCEPT, VALUES, WITH, ORDER_BY, and EXPLICIT_TABLE.")
+}
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
--- End diff --

Rephrase: "A [[Table]] is automatically registered when the 
[[Table.toString()]] method is called, i.e., when it is embedded into a string. 
Hence, SQL queries can directly reference a [[Table]] as follows: "


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139530317
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +798,68 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableName
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+*/
+  def insertInto(tableName: String): Unit = {
+insertInto(tableName, this.tableEnv.queryConfig)
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableName
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+*/
+  def insertInto(tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
+// validate if the tableSink is registered
+if (!tableEnv.isRegistered(tableName)) {
--- End diff --

I think it is better to move the implementation to a method 
`TableEnvironment.insertTableInto(table: Table, sinkTable: String, conf: 
QueryConfig)`. This would also allow to make `TableEnvironment.getTable()` 
`private`.


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139470670
  
--- Diff: docs/dev/table/common.md ---
@@ -458,7 +595,7 @@ Table API and SQL queries are translated into 
[DataStream]({{ site.baseurl }}/de
 
 A Table API or SQL query is translated when:
 
-* the `Table` is emitted to a `TableSink`, i.e., when 
`Table.writeToSink()` is called.
+* the `Table` is emitted to a `TableSink` or a registered sink table, 
i.e., when `Table.writeToSink()` or `Table.insertInto()`is called.
--- End diff --

rephrase to 
` the `Table` is emitted to a `TableSink` (provided or registered)`


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139468211
  
--- Diff: docs/dev/table/common.md ---
@@ -387,6 +433,59 @@ Table revenue = tableEnv.sql("""
 
 
 
+The following example shows how to specify a update query to insert the 
result to a registered Table.
+
+
+
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
+StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+// register Orders table
+...
+
+// register SinkResult table
--- End diff --

change to `// register RevenueFrance table as TableSink`?


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139466642
  
--- Diff: docs/dev/table/common.md ---
@@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource)
 
 {% top %}
 
+### Register a TableSink
+
+A `TableSink` [emits a Table](common.html#emit-a-table) to an external 
storage system, such as a database, key-value store, message queue, or file 
system (in different encodings, e.g., CSV, Parquet, or ORC).
+
+Flink aims to provide TableSources for common data formats and storage 
systems. Please see the documentation about [Table Sources and Sinks]({{ 
site.baseurl }}/dev/table/sourceSinks.html) page for details about available 
sinks and instructions for how to implement a custom `TableSink`.
+
+A `TableSink` is registered in a `TableEnvironment` as follows:
--- End diff --

We should add a short paragraph why Table, TableSource, and TableSink need 
to be registered to the `Register a Table in the Catalog` section. The 
subsection should have links to the sections that show how registered tables 
are used (SQL: `FROM`, `INSERT INTO`, Table API: `scan()`, `insertInto()`).


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139472344
  
--- Diff: docs/dev/table/sql.md ---
@@ -169,6 +202,37 @@ Flink SQL uses a lexical policy for identifier (table, 
attribute, function names
 Operations
 
 
+### InsertInto
+
+
+
+  
+
+  Operation
+  Description
+
+  
+  
+   
+  
+Order By
--- End diff --

`Order By` -> `Insert Into`


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139522124
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,56 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] with given field names and types 
in this
+* [[TableEnvironment]]'s catalog. Registered sink tables can be 
referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink and its field names and types
+* {{{
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val fieldTypes: Array[TypeInformation[_]]  = Array(Types.STRING, 
Types.INT, Types.LONG)
+*   val tableSink: TableSink = new YourTableSinkImpl(...)
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", fieldNames, 
fieldsTypes, tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name  The name under which the [[TableSink]] is 
registered.
+* @param tableSink The [[TableSink]] to register.
--- End diff --

add `fieldNames` and `fieldTypes` parameters


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139470224
  
--- Diff: docs/dev/table/common.md ---
@@ -407,6 +506,14 @@ A batch `Table` can only be written to a 
`BatchTableSink`, while a streaming tab
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl 
}}/dev/table/sourceSinks.html) for details about available sinks and 
instructions for how to implement a custom `TableSink`.
 
+There're two ways to emit a table:
+
+1. Using `Table.writeToSink` which derives output schema from the input 
table.
--- End diff --

Rephrase:
```
1. `Table.writeToSink(TableSink sink)` emit the table using the provided 
TableSink and configures the sink with the schema of the table to emit.
2. `Table.insertInto(String table)` looks up the TableSink in the catalog 
that was registered with a specific schema as `table`. The emission fails if 
`table` was not registered or if the schema of the registered TableSink does 
match the schema of the table to emit.
```


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139471847
  
--- Diff: docs/dev/table/sql.md ---
@@ -89,15 +117,16 @@ val result2 = tableEnv.sql(
 Supported Syntax
 
 
-Flink parses SQL using [Apache 
Calcite](https://calcite.apache.org/docs/reference.html), which supports 
standard ANSI SQL. DML and DDL statements are not supported by Flink.
+Flink parses SQL using [Apache 
Calcite](https://calcite.apache.org/docs/reference.html), which supports 
standard ANSI SQL. DDL statements are not supported by Flink.
 
 The following BNF-grammar describes the superset of supported SQL features 
in batch and streaming queries. The [Operations](#operations) section shows 
examples for the supported features and indicates which features are only 
supported for batch or streaming queries.
 
 ```
 
 query:
   values
-  | {
+  |[ insert into tableReference ]
--- End diff --

change to `|[ insert tableReference ]` and verify.
Should actually also moved before `values` because we can use this as well 
to write literal rows to the sink


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139471292
  
--- Diff: docs/dev/table/sql.md ---
@@ -49,15 +49,29 @@ DataStream> ds = 
env.addSource(...);
 
 // SQL query with an inlined (unregistered) table
 Table table = tableEnv.toTable(ds, "user, product, amount");
-Table result = tableEnv.sql(
+Table result = tableEnv.sqlQuery(
   "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
 
 // SQL query with a registered table
 // register the DataStream as table "Orders"
 tableEnv.registerDataStream("Orders", ds, "user, product, amount");
 // run a SQL query on the Table and retrieve the result as a new Table
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "user, product, amount");
+// create a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+// define the field names and types
+String[] fieldNames = {"id", "product", "amount"};
--- End diff --

fix schema of result table to query result


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139526993
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,23 +521,130 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param query The SQL query to evaluate.
 * @return The result of the query as Table.
 */
+  @deprecated("use [[sqlQuery()]] instead")
   def sql(query: String): Table = {
+sqlQuery(query)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param query The SQL query to evaluate.
+* @return The result of the query as Table
+*/
+  def sqlQuery(query: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported SQL query! sqlQuery() only accepts SQL queries of 
type SELECT, UNION, " +
+  "INTERSECT, EXCEPT, VALUES, WITH, ORDER_BY, and EXPLICIT_TABLE.")
+}
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name except table sink table(Table sink tables are not 
automatically registered via
+* [[Table.toString]]). So it allows to call SQL directly on tables 
like this:
+*
+* {{{
+*   // should register the table sink which will be inserted into
+*   tEnv.registerTableSink("target_table", fieldNames, fieldsTypes, 
tableSink)
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param stmt The SQL statement to evaluate.
+*/
+  def sqlUpdate(stmt: String): Unit = {
+sqlUpdate(stmt, this.queryConfig)
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name except table sink table(Table sink tables are not 
automatically registered via
+* [[Table.toString]]). So it allows to call SQL directly on tables 
like this:
+*
+* {{{
+*   // should register the table sink which will be inserted into
+*   tEnv.registerTableSink("target_table", fieldNames, fieldsTypes, 
tableSink)
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param stmt The SQL statement to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(stmt)
+parsed match {
+  case insert: SqlInsert => {
+// validate the sql query
+planner.validate(parsed)
+
+// validate sink table
+val targetName = 

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139468366
  
--- Diff: docs/dev/table/common.md ---
@@ -387,6 +433,59 @@ Table revenue = tableEnv.sql("""
 
 
 
+The following example shows how to specify a update query to insert the 
result to a registered Table.
+
+
+
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
+StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+// register Orders table
+...
+
+// register SinkResult table
+...
+
+// compute revenue for all customers from France and emit to SinkResult
+tableEnv.sqlUpdate(
+"INSERT INTO SinkResult" +
--- End diff --

add whitespace to end of String


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139529093
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -554,10 +680,14 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param name The table name to check.
 * @return true, if a table is registered under the name, false 
otherwise.
 */
-  protected def isRegistered(name: String): Boolean = {
+  protected[flink] def isRegistered(name: String): Boolean = {
 rootSchema.getTableNames.contains(name)
   }
 
+  protected[flink] def getTable(name: String): 
org.apache.calcite.schema.Table = {
--- End diff --

can we make this a `private` method? Otherwise it will be visible from Java 
and confuse users (the returned Table is not a Table API Table but a Calcite 
Table).


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139471593
  
--- Diff: docs/dev/table/sql.md ---
@@ -71,15 +85,29 @@ val ds: DataStream[(Long, String, Integer)] = 
env.addSource(...)
 
 // SQL query with an inlined (unregistered) table
 val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
-val result = tableEnv.sql(
+val result = tableEnv.sqlQuery(
   s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
 
 // SQL query with a registered table
 // register the DataStream under the name "Orders"
 tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
 // run a SQL query on the Table and retrieve the result as a new Table
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+
+// SQL update with a registered table
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
+// create a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...)
+// define the field names and types
+val fieldNames: Arary[String] = Array("id", "product", "amount")
--- End diff --

fix schema of result (must match query result schema)


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139471468
  
--- Diff: docs/dev/table/common.md ---
@@ -387,6 +433,59 @@ Table revenue = tableEnv.sql("""
 
 
 
+The following example shows how to specify a update query to insert the 
result to a registered Table.
+
+
+
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
+StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+// register Orders table
+...
+
+// register SinkResult table
+...
+
+// compute revenue for all customers from France and emit to SinkResult
+tableEnv.sqlUpdate(
+"INSERT INTO SinkResult" +
+"SELECT cID, cName, SUM(revenue) AS revSum " +
+"FROM Orders " +
+"WHERE cCountry = 'FRANCE' " +
+"GROUP BY cID, cName"
+  );
+
+// execute query
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register Orders table
+...
+
+// register SinkResult table
--- End diff --

same as above


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139530556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 ---
@@ -85,16 +84,15 @@ class FlinkPlannerImpl(
 validator = new FlinkCalciteSqlValidator(operatorTable, 
createCatalogReader, typeFactory)
 validator.setIdentifierExpansion(true)
 try {
-  validatedSqlNode = validator.validate(sqlNode)
+   validator.validate(sqlNode)
--- End diff --

indent -1 space


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139472638
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -146,6 +146,39 @@ Operations
 
 The Table API supports the following operations. Please note that not all 
operations are available in both batch and streaming yet; they are tagged 
accordingly.
 
+### Insert
+
+
+
+
+
+  
+
+  Operators
+  Description
+
+  
+  
+   
+   
+Scan
--- End diff --

`Scan` -> `Insert Into`


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139529681
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -762,13 +763,10 @@ class Table(
 * @tparam T The data type that the [[TableSink]] expects.
 */
   def writeToSink[T](sink: TableSink[T]): Unit = {
-
-def queryConfig = this.tableEnv match {
-  case s: StreamTableEnvironment => s.queryConfig
-  case b: BatchTableEnvironment => new BatchQueryConfig
-  case _ => null
+val queryConfig = Option(this.tableEnv) match {
--- End diff --

`tableEnv` should never be `null`. Can we make this `def queryConfig = 
this.tableEnv.queryConfig`?


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139524452
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,23 +521,130 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param query The SQL query to evaluate.
 * @return The result of the query as Table.
 */
+  @deprecated("use [[sqlQuery()]] instead")
   def sql(query: String): Table = {
+sqlQuery(query)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param query The SQL query to evaluate.
+* @return The result of the query as Table
+*/
+  def sqlQuery(query: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported SQL query! sqlQuery() only accepts SQL queries of 
type SELECT, UNION, " +
+  "INTERSECT, EXCEPT, VALUES, WITH, ORDER_BY, and EXPLICIT_TABLE.")
+}
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
--- End diff --

+s `statementS`


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139473338
  
--- Diff: docs/dev/table/sql.md ---
@@ -169,6 +202,37 @@ Flink SQL uses a lexical policy for identifier (table, 
attribute, function names
 Operations
 
 
+### InsertInto
+
+
+
+  
+
+  Operation
+  Description
+
+  
+  
+   
+  
+Order By
+Batch Streaming
+  
+  
+{% highlight sql %}
--- End diff --

Highlight that `Results` must be a registered `TableSink` with the right 
schema with a link to the section about registering table sinks.


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139526149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,23 +521,130 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param query The SQL query to evaluate.
 * @return The result of the query as Table.
 */
+  @deprecated("use [[sqlQuery()]] instead")
   def sql(query: String): Table = {
+sqlQuery(query)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param query The SQL query to evaluate.
+* @return The result of the query as Table
+*/
+  def sqlQuery(query: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported SQL query! sqlQuery() only accepts SQL queries of 
type SELECT, UNION, " +
+  "INTERSECT, EXCEPT, VALUES, WITH, ORDER_BY, and EXPLICIT_TABLE.")
+}
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name except table sink table(Table sink tables are not 
automatically registered via
+* [[Table.toString]]). So it allows to call SQL directly on tables 
like this:
+*
+* {{{
+*   // should register the table sink which will be inserted into
+*   tEnv.registerTableSink("target_table", fieldNames, fieldsTypes, 
tableSink)
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param stmt The SQL statement to evaluate.
+*/
+  def sqlUpdate(stmt: String): Unit = {
+sqlUpdate(stmt, this.queryConfig)
+  }
+
+  /**
+* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL 
statement;
+* Currently only SQL INSERT statement on registered tables are 
supported.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name except table sink table(Table sink tables are not 
automatically registered via
+* [[Table.toString]]). So it allows to call SQL directly on tables 
like this:
+*
+* {{{
+*   // should register the table sink which will be inserted into
+*   tEnv.registerTableSink("target_table", fieldNames, fieldsTypes, 
tableSink)
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param stmt The SQL statement to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(stmt)
+parsed match {
+  case insert: SqlInsert => {
+// validate the sql query
+planner.validate(parsed)
+
+// validate sink table
+val targetName = 

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139473008
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -146,6 +146,39 @@ Operations
 
 The Table API supports the following operations. Please note that not all 
operations are available in both batch and streaming yet; they are tagged 
accordingly.
 
+### Insert
+
+
+
+
+
+  
+
+  Operators
+  Description
+
+  
+  
+   
+   
+Scan
+Batch Streaming
+  
+   
+Similar to the INSERT INTO clause in a SQL query. Performs a 
insertion into a registered table.
+{% highlight java %}
+tableEnv.insertInto("Results");
--- End diff --

should be `table` instead of `tableEnv`. Highlight that `Results` must be a 
registered `TableSink` with the right schema with a link to the section about 
registering table sinks.


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139470942
  
--- Diff: docs/dev/table/sql.md ---
@@ -49,15 +49,29 @@ DataStream> ds = 
env.addSource(...);
 
 // SQL query with an inlined (unregistered) table
 Table table = tableEnv.toTable(ds, "user, product, amount");
-Table result = tableEnv.sql(
+Table result = tableEnv.sqlQuery(
   "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
 
 // SQL query with a registered table
 // register the DataStream as table "Orders"
 tableEnv.registerDataStream("Orders", ds, "user, product, amount");
 // run a SQL query on the Table and retrieve the result as a new Table
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "user, product, amount");
--- End diff --

I think we can reuse the previously registered Orders table.


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139524209
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,23 +521,130 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param query The SQL query to evaluate.
 * @return The result of the query as Table.
 */
+  @deprecated("use [[sqlQuery()]] instead")
   def sql(query: String): Table = {
+sqlQuery(query)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param query The SQL query to evaluate.
+* @return The result of the query as Table
+*/
+  def sqlQuery(query: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported SQL query! sqlQuery() only accepts SQL queries of 
type SELECT, UNION, " +
--- End diff --

Not sure if we actually support `WITH` and `EXPLICIT_TABLE`. Everything 
else is supported (at least for batch).


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r139468885
  
--- Diff: docs/dev/table/common.md ---
@@ -407,6 +506,14 @@ A batch `Table` can only be written to a 
`BatchTableSink`, while a streaming tab
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl 
}}/dev/table/sourceSinks.html) for details about available sinks and 
instructions for how to implement a custom `TableSink`.
 
+There're two ways to emit a table:
--- End diff --

I try to avoid contractions in the docs. `There're` - >`There are`


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136847905
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

The problem is that we should have the correct subtype of `QueryConfig` 
(not only the returned object but also the method signature) and the separation 
of the stream and batch APIs. The `StreamExecutionEnvironment.queryConfig` 
method must be typed to `StreamQueryConfig` to prevent casting. However, we 
don't want to have a method that is typed to `StreamQueryConfig` in the base 
class `TableEnvironment` because 1) this class should be independent from the 
streaming API and 2) we don't want a streaming method in the 
`BatchTableEnvironment`.

So, we can add a method to `TableEnvironment` that returns `QueryConfig` 
(depending on its own type a `StreamQueryConfig` or `BatchQueryConfig`) and 
override this method in `StreamTableEnvironment` such that it returns a 
`StreamQueryConfig` (and if later necessary in `BatchTableEnvironment` to 
return a `BatchQueryConfig`).

The duplicated code are < 5 lines and we have correct types everywhere. 


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136850229
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
--- End diff --

I proposed this API for the following reason:

I would like to have a common API and preconditions for pre-registered and 
on-demand used table sinks such that all (also existing) table sinks can be 
used in both of these use cases. 

There is already the `configure()` method which has exactly the purpose of 
setting the field names and types. Currently, this method is only internally 
called during the translation process. Of course, users could implement a 
`TableSink` by setting field names and types in the constructor (as shown in 
the Scala docs of this PR), but I think this would kind of circumvent the 
current API and might lead to `TableSink` implementations that can be either 
used in a pre-registered or an on-demand setting. Hence, I think it would be 
better to "enforce" the use of the `configure()` method by designing the API 
such that the `configure()` method is always internally called and hence 
mandatory. That way we can guarantee that each `TableSink` can be used in both 
cases because both use case require `configure()` and use it in the same way.

We could ask users to call the `configure()` method before registering the 
table sink with an error message (as @wuchong proposed) or enforce this through 
the API. I think the second approach is better because users would not 
experience an exception. In my opinion, we should at least "encourage" the use 
of the `configure()` method by not giving an example that sets field names and 
types in the constructor.

What do you think @lincoln-lil and @wuchong ?


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-09-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136845193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {
+   

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136491885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136363886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

I agree with @lincoln-lil , we can define an abstract method in 
`TableEnvironment`  and implement it in `StreamTableEnvironment` which do not 
need to cast QueryConfig to StreamQueryConfig manually.

```
def queryConfig: StreamQueryConfig = new StreamQueryConfig
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136362552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
--- End diff --

I think a pre-registered table sink should contain its own schema 
information, so I prefer `registerTableSink(String, TableSink)` interface. If 
we register a table sink without schema in it, we can throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136325581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

The `queryConfig` method in `StreamTableEnvironment` does the same thing as 
the overridden one, just because the class return type, this makes the code 
duplicated.

There's another subclass `MockTableEnvironment`,  so the default method in 
`TableEnvironment` may be

```
private[flink] def queryConfig: QueryConfig = this match {
case _: BatchTableEnvironment => new BatchQueryConfig
case _: StreamTableEnvironment => new StreamQueryConfig
case _ => ???
}
```

If we decide to add an `queryConfig` method, I suggest make it abstract and 
 implements in every subclass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136321128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
--- End diff --

Considering the current registerTableSource api, we haven't define such a 
method requires users to offer fieldNames and fieldTypes when do registration, 
I suggest keep their symmetry, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136163965
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
+// validate if the tableSink is registered
+if (!tableEnv.isRegistered(tableName)) {
+  throw TableException("tableSink must be registered.")
+}
+// find if the tableSink is registered //, include validation 
internally
+tableEnv.getTable(tableName) match {
+  case sink: TableSinkTable[_] => {
+// get row type info of upstream table
+val rowType = getRelNode.getRowType
+val srcFieldTypes: Array[TypeInformation[_]] = 
rowType.getFieldList.asScala
+  .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+// column count validation
+if (srcFieldTypes.length != sink.fieldTypes.length) {
+  throw TableException(s"source column count doesn't match target 
table[$tableName]'s.")
+}
+// column type validation, no need to validate field names
+if (sink.fieldTypes.zipWithIndex.exists(f => f._1 != 
srcFieldTypes(f._2))) {
+  throw TableException(s"source row type doesn't match target 
table[$tableName]'s.")
--- End diff --

Schema of inserted table must exactly match the schema of the target table. 
Inserted table: [a: INTEGER, b: VARCHAR], Destination table: [a: INTEGER, b: 
VARCHAR, c: LONG].


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136125340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -1002,4 +1124,28 @@ object TableEnvironment {
 case d: DefinedFieldNames => d.getFieldIndices
 case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType)
   }
+
+  /**
+* Returns field names for a given [[TableSink]].
+*
+* @param tableSink The TableSink to extract field names from.
+* @tparam A The type of the TableSink.
+* @return An array holding the field names.
+*/
+  def getFieldNames[A](tableSink: TableSink[A]): Array[String] = tableSink 
match {
+  case d: DefinedFieldNames => d.getFieldNames
+  case _ => TableEnvironment.getFieldNames(tableSink.getOutputType)
+}
+
+  /**
+* Returns field indices for a given [[TableSink]].
+*
+* @param tableSink The TableSink to extract field indices from.
+* @tparam A The type of the TableSink.
+* @return An array holding the field indices.
+*/
+  def getFieldIndices[A](tableSink: TableSink[A]): Array[Int] = tableSink 
match {
--- End diff --

We don't need this method. A `TableSink` is configured with a schema. It 
does not need indicies and should not implement `DefinedFieldNames`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136118894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+val parsed = planner.parse(sql)
+parsed 

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136130122
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

I thought about this before. I did not add `TableEnvironment.queryConfig: 
QueryConfig` because users would always have to cast `QueryConfig` to 
`StreamQueryConfig` in order to use it. Instead there is a `queryConfig` method 
for `StreamTableEnvironment` which returns a `StreamQueryConfig`.

I would not like to expose the static creation method to users. 
How about we add this method to `TableEnvironment`:

private[flink] def queryConfig: QueryConfig = this match {
  case _: BatchTableEnvironment => new BatchQueryConfig
  case _: StreamTableEnvironment => new StreamQueryConfig
}

The method can be overridden by `StreamTableEnvironment` as
```
override def queryConfig: StreamQueryConfig = new StreamQueryConfig
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136156910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
+// validate if the tableSink is registered
+if (!tableEnv.isRegistered(tableName)) {
+  throw TableException("tableSink must be registered.")
--- End diff --

`"tableSink must be registered"` -> `s"No table $tableName registered."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136154973
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
--- End diff --

The type parameter `T` can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136118574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
-// validate the sql query
-val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
+val parsed = planner.parse(sql)
+parsed 

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136156100
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
--- End diff --

type parameter `T` can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136123743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -1002,4 +1124,28 @@ object TableEnvironment {
 case d: DefinedFieldNames => d.getFieldIndices
 case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType)
   }
+
+  /**
+* Returns field names for a given [[TableSink]].
+*
+* @param tableSink The TableSink to extract field names from.
+* @tparam A The type of the TableSink.
+* @return An array holding the field names.
+*/
+  def getFieldNames[A](tableSink: TableSink[A]): Array[String] = tableSink 
match {
--- End diff --

I don't think we need this method. We can get the field names directly via 
`TableSink.getFieldNames()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136115720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
--- End diff --

Technically, `SELECT` is a DML statement as well. Are these supported here 
as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136158979
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
+// validate if the tableSink is registered
+if (!tableEnv.isRegistered(tableName)) {
+  throw TableException("tableSink must be registered.")
+}
+// find if the tableSink is registered //, include validation 
internally
+tableEnv.getTable(tableName) match {
+  case sink: TableSinkTable[_] => {
+// get row type info of upstream table
+val rowType = getRelNode.getRowType
+val srcFieldTypes: Array[TypeInformation[_]] = 
rowType.getFieldList.asScala
+  .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+// column count validation
+if (srcFieldTypes.length != sink.fieldTypes.length) {
+  throw TableException(s"source column count doesn't match target 
table[$tableName]'s.")
+}
+// column type validation, no need to validate field names
+if (sink.fieldTypes.zipWithIndex.exists(f => f._1 != 
srcFieldTypes(f._2))) {
+  throw TableException(s"source row type doesn't match target 
table[$tableName]'s.")
+}
+// emit the table to the configured table sink
+tableEnv.writeToSink(this, sink.tableSink, conf)
+  }
+  case _ =>
+throw new TableException("InsertInto operation needs a registered 
TableSink Table!")
--- End diff --

`s"A Table can only be emitted to a TableSink. $tableName was not 
registered as a TableSink."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136113499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
--- End diff --

Since the method is called `sqlQuery`, I'd rename`sql` to `query`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136116361
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
--- End diff --

I would add a comment that the table into which is inserted needs to be 
registered as table sink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136163943
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
+// validate if the tableSink is registered
+if (!tableEnv.isRegistered(tableName)) {
+  throw TableException("tableSink must be registered.")
+}
+// find if the tableSink is registered //, include validation 
internally
+tableEnv.getTable(tableName) match {
+  case sink: TableSinkTable[_] => {
+// get row type info of upstream table
+val rowType = getRelNode.getRowType
+val srcFieldTypes: Array[TypeInformation[_]] = 
rowType.getFieldList.asScala
+  .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+// column count validation
+if (srcFieldTypes.length != sink.fieldTypes.length) {
+  throw TableException(s"source column count doesn't match target 
table[$tableName]'s.")
--- End diff --

Schema of inserted table must exactly match the schema of the target table. 
Inserted table: [a: INTEGER, b: VARCHAR], Destination table: [a: INTEGER, b: 
VARCHAR, c: LONG].


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136104550
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
--- End diff --

I think we should not require users to correctly configure the field names 
and types of the table sink before registering it.
We can configure it ourselves:

```
val tableSink: TableSink = ...
val fieldNames: Array[String]  = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]]  = Array(Types.INT, Types.LONG, 
Types.STRING)

tableEnv.registerTableSink("example_sink_table", fieldNames, fieldTypes, 
tableSink)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136109255
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name  The name under which the [[TableSink]] is 
registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
--- End diff --

In our discussion, I proposed to request field names and types when 
registering a `TableSink` instead of requiring users to specify those in the 
`TableSink` itself. Internally, we would create a configure the passed table 
sink and register the returned copy.

So the signature `registerTableSink` method would look like:

```
override def registerTableSink(
  name: String,
  fieldNames: Array[String],
  fieldTypes: Array[TypeInformation[_]],
  tableSink: TableSink[_]): Unit
```

I think this is a better approach because it uses the current `TableSink` 
interface and enforces users to specify field names and types. Otherwise, we 
need special `TableSink` implementations that request this information in the 
constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136114635
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

I'd change the error message to
> Unsupported SQL query! sqlQuery() only accepts SQL queries of type 
SELECT, UNION, INTERSECT, EXCEPT, VALUES, WITH, ORDER_BY, and EXPLICIT_TABLE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136117769
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {
+   

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136115991
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
--- End diff --

> Currently, only SQL INSERT statements on registered tables are supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136110559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name  The name under which the [[TableSink]] is 
registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
+checkValidTableName(name)
+
+tableSink match {
+  case batchTableSink: BatchTableSink[_] =>
+registerTableInternal(name, new TableSinkTable(batchTableSink))
--- End diff --

register a configured copy of `batchTableSink`:

```
val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
registerTableInternal(name, new TableSinkTable(configuredSink))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136112180
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
--- End diff --

keep `query` instead of `sql`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136163423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
+// validate if the tableSink is registered
+if (!tableEnv.isRegistered(tableName)) {
+  throw TableException("tableSink must be registered.")
+}
+// find if the tableSink is registered //, include validation 
internally
+tableEnv.getTable(tableName) match {
+  case sink: TableSinkTable[_] => {
+// get row type info of upstream table
+val rowType = getRelNode.getRowType
+val srcFieldTypes: Array[TypeInformation[_]] = 
rowType.getFieldList.asScala
+  .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+// column count validation
+if (srcFieldTypes.length != sink.fieldTypes.length) {
+  throw TableException(s"source column count doesn't match target 
table[$tableName]'s.")
+}
+// column type validation, no need to validate field names
+if (sink.fieldTypes.zipWithIndex.exists(f => f._1 != 
srcFieldTypes(f._2))) {
--- End diff --

simplify to `sink.fieldTypes.zip(srcFieldTypes).exists(t => t._1 != t._2)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136124917
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+/** Table which defines an external table via a [[TableSink]] */
+class TableSinkTable[T](
+val tableSink: TableSink[T],
+override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends FlinkTable[T](
--- End diff --

I don't think we should extend `FlinkTable` but go directly against 
Calcite's `AbstractTable`. `FlinkTable` deals with many issues like POJO types 
that are not required for table sinks.

`TableSinkTable` can override `getRowType()` by requesting field names and 
types directly from the `TableSink`:

```
override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
  val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
  flinkTypeFactory.buildLogicalRowType(
tableSink.getFieldNames,
tableSink.getFieldTypes,
None,
None)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136110776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -132,6 +132,46 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
--- End diff --

see comments in `BatchTableEnvironment` about configuring the table sink


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136166359
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+object MemoryTableSinkUtil {
+  var results: mutable.MutableList[String] = 
mutable.MutableList.empty[String]
+
+  def clear = {
+MemoryTableSinkUtil.results.clear()
+  }
+
+  final class UnsafeMemoryAppendTableSink(
+  fieldTypes: Array[TypeInformation[_]],
--- End diff --

remove types and names constructor parameters and configure internally by 
`TableSink.configure()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136156414
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String): Unit = {
+insertInto(tableName, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableName Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableName: String, conf: QueryConfig): Unit = {
+require(tableName != null && !tableName.isEmpty, "tableSink must not 
be null or empty.")
--- End diff --

`tableSink` -> `tableName`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136116568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
--- End diff --

We need an exception here for the table sink table. Table sink tables are 
not automatically registered via `toString()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136114764
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,133 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated("use [[sqlQuery()]] instead")
+  def sql(sql: String): Table = {
+sqlQuery(sql)
+  }
+
+  /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
--- End diff --

change `sql` to `stmt`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136111695
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -417,6 +418,15 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   def registerTableSource(name: String, tableSource: TableSource[_]): Unit
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* @param nameThe name under which the [[TableSink]] is 
registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  def registerTableSink(name: String, tableSink: TableSink[_]): Unit
--- End diff --

change interface to include field names and types?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134248050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

I'm not sure if we should move the `queryConfig` method to TableEnvironment 
or leave it as an utility method, what do you think? @fhueske 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134185829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
--- End diff --

sounds reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134186403
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

Not only INSERT but also UPDATE/DELETE ...
It'll reject all kinds of sql except SELECT, EXCEPT, INTERSECT, UNION, 
VALUES, ORDER_BY, EXPLICIT_TABLE.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134186937
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134184973
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -132,6 +132,44 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
+checkValidTableName(name)
+
+tableSink match {
+  case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] |
+_: RetractStreamTableSink[_]) =>
+registerTableInternal(name, new TableSinkTable(t))
+  case _ =>
+throw new TableException("BatchTableSink can not be registered in 
StreamTableEnvironment")
--- End diff --

make sense to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134228841
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.types.Row
+
+class CsvSQLTableSink(
--- End diff --

Yes, I should remove this class since I'm not sure wether to add it as a 
built-in sink table that schema can be declared.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134185390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
--- End diff --

yes, the replacement should be mentioned here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134158258
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

How about add an unimplemented `queryConfig` method to TableEnvironment to 
avoid this util ? 

```
def queryConfig: QueryConfig
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134158670
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -132,6 +132,44 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
+checkValidTableName(name)
+
+tableSink match {
+  case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] |
+_: RetractStreamTableSink[_]) =>
+registerTableInternal(name, new TableSinkTable(t))
+  case _ =>
+throw new TableException("BatchTableSink can not be registered in 
StreamTableEnvironment")
--- End diff --

There maybe more kinds of TableSink in the future. So I would suggest to 
change this exception message to `Only AppendStreamTableSink, 
UpsertStreamTableSink and RetractStreamTableSink can be registered in 
StreamTableEnvironment`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134162144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

Is this exception used to reject a INSERT INTO sql ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134177144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {
+   

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134173548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableSink Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableSink: String): Unit = {
+insertInto(tableSink, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableSink Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableSink: String, conf: QueryConfig): Unit = {
--- End diff --

change the parameter name `tableSink` to `tableName` ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134180276
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.types.Row
+
+class CsvSQLTableSink(
--- End diff --

The `CsvSQLTableSink` is never used, should we remove this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134161822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
--- End diff --

I think the deprecated `sql` method should delegate calls to `sqlQuery`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134173572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableSink Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableSink: String): Unit = {
--- End diff --

change the parameter name `tableSink` to `tableName` ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134160516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
--- End diff --

Please add a `deprecated` comment to the javadoc to tell users which new 
api should be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133920847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
--- End diff --

There're more than one place need this get default QueryConfig from table 
env.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133921697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're 
the same. Could not help us to distinguish the sql type, so use SqlKind here, 
SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, 
EXPLICIT_TABLE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127446535
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -319,7 +337,7 @@ abstract class BatchTableEnvironment(
 * @tparam A The type of the resulting [[DataSet]].
 * @return The [[DataSet]] that corresponds to the translated [[Table]].
 */
-  protected def translate[A](table: Table)(implicit tpe: 
TypeInformation[A]): DataSet[A] = {
+  def translate[A](table: Table)(implicit tpe: TypeInformation[A]): 
DataSet[A] = {
--- End diff --

why did you make this method public? 
I think it should be hidden from users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127451116
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -488,19 +499,50 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
-* @return The result of the query as Table.
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
 */
-  def sql(query: String): Table = {
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
 
-new Table(this, LogicalRelNode(relational.rel))
+// separate source translation if DML insert, considering sink 
operation has no output but all
+// RelNodes' translation should return a DataSet/DataStream, and 
Calcite's TableModify
+// defines output for DML(INSERT,DELETE,UPDATE) is a RowCount column 
of BigInt type. This a
+// work around since we have no conclusion for the output type 
definition of sink operation
+// (compare to traditional SQL insert).
+if (parsed.isInstanceOf[SqlInsert]) {
+  val insert = parsed.asInstanceOf[SqlInsert]
--- End diff --

The `isInstanceOf` / `asInstanceOf` can be more elegantly implemented with 
pattern matching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127460882
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
+import org.apache.flink.types.Row
+
+class CsvSQLTableSink(
--- End diff --

I think we should use a `TableSink` which is backed by a `static` `List` 
object for tests because it is faster and more reliable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127456171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
--- End diff --

I don't think we have to add this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127459823
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
 ---
@@ -113,4 +116,31 @@ class TableWithSQLITCase(
 val results = result.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  /** test insert into **/
+  @Test
+  def testInsertIntoTable(): Unit = {
+val tmpFile = File.createTempFile("flink-sql-table-sink-test1", ".tmp")
+tmpFile.deleteOnExit()
+val path = tmpFile.toURI.toString
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val t = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("sourceTable", t)
+
+val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+val fieldNames = Seq("d", "e", "f").toArray
+val sink = new CsvSQLTableSink(path, fieldTypes, fieldNames, ",")
--- End diff --

We could also implement a `TableSink` which stores results in a `static 
List` guarded by a lock to synchronize this inserts. This would make the test 
faster and more reliable (going of the the filesystem is a reason for flaky 
tests).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127460421
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnsupportedSqlTest.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import java.io.File
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamTestData, 
StreamingWithStateTestBase}
+import org.junit.Test
+import org.apache.flink.table.utils.CsvSQLTableSink
+
+class UnsupportedSqlTest extends StreamingWithStateTestBase {
--- End diff --

I think this test does not need to extend `StreamingWithStateTestBase`. The 
test base starts a Flink system which make the test quite expensive. This test 
will fail before the Flink system can be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127449330
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -488,19 +499,50 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
-* @return The result of the query as Table.
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
 */
-  def sql(query: String): Table = {
+  def sql(sql: String): Table = {
--- End diff --

I'm not sure if we should add this functionality to the `sql()` method. The 
`sql()` method returns a `Table` which I would not expect if I define an 
`INSERT INTO` statement. Returning `null` is not a good idea in my opinion.

I would rather add a new method maybe called `sqlInsert()`. The method 
would check the query type and could also give a meaningful error message with 
a hint of which method should be used instead.

What do you think @lincoln-lil?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127448161
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -131,6 +131,25 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
--- End diff --

please add an example query


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127456022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -488,19 +499,50 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
-* @return The result of the query as Table.
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
 */
-  def sql(query: String): Table = {
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
 
-new Table(this, LogicalRelNode(relational.rel))
+// separate source translation if DML insert, considering sink 
operation has no output but all
+// RelNodes' translation should return a DataSet/DataStream, and 
Calcite's TableModify
+// defines output for DML(INSERT,DELETE,UPDATE) is a RowCount column 
of BigInt type. This a
+// work around since we have no conclusion for the output type 
definition of sink operation
+// (compare to traditional SQL insert).
+if (parsed.isInstanceOf[SqlInsert]) {
+  val insert = parsed.asInstanceOf[SqlInsert]
+  // validate sink table
+  val targetName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
+  val targetTable = getTable(targetName)
+  if (null == targetTable || 
!targetTable.isInstanceOf[TableSinkTable[_]]) {
+throw new TableException("SQL DML INSERT operation need a 
registered TableSink Table!")
+  }
+  // validate unsupported partial insertion to sink table
+  val sinkTable = targetTable.asInstanceOf[TableSinkTable[_]]
+  if (null != insert.getTargetColumnList && 
insert.getTargetColumnList.size() !=
+sinkTable.fieldTypes.length) {
+throw new TableException(
+  "SQL DML INSERT do not support insert partial columns of the 
target table due to table " +
+"columns haven’t nullable property definition for now!")
+  }
+
+  writeToSink(
+new Table(this, LogicalRelNode(planner.rel(insert.getSource).rel)),
+sinkTable.tableSink,
+QueryConfig.getQueryConfigFromTableEnv(this)
--- End diff --

This will return the default configuration. If we move `INSERT INTO` 
queries into a special method (like `sqlInsert()`), the method could have an 
additional `QueryConfig` parameter.

Otherwise, users won't be able to tweak the execution of a streaming query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127452041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -488,19 +499,50 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
-* @return The result of the query as Table.
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
 */
-  def sql(query: String): Table = {
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
-// transform to a relational tree
-val relational = planner.rel(validated)
 
-new Table(this, LogicalRelNode(relational.rel))
+// separate source translation if DML insert, considering sink 
operation has no output but all
+// RelNodes' translation should return a DataSet/DataStream, and 
Calcite's TableModify
+// defines output for DML(INSERT,DELETE,UPDATE) is a RowCount column 
of BigInt type. This a
+// work around since we have no conclusion for the output type 
definition of sink operation
+// (compare to traditional SQL insert).
+if (parsed.isInstanceOf[SqlInsert]) {
+  val insert = parsed.asInstanceOf[SqlInsert]
+  // validate sink table
+  val targetName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
+  val targetTable = getTable(targetName)
+  if (null == targetTable || 
!targetTable.isInstanceOf[TableSinkTable[_]]) {
+throw new TableException("SQL DML INSERT operation need a 
registered TableSink Table!")
+  }
+  // validate unsupported partial insertion to sink table
+  val sinkTable = targetTable.asInstanceOf[TableSinkTable[_]]
+  if (null != insert.getTargetColumnList && 
insert.getTargetColumnList.size() !=
+sinkTable.fieldTypes.length) {
--- End diff --

I think there is a mismatch between the `TableSink` interface and how it is 
used as an output table here. An instance of `TableSink` has no predefined 
columns but gets those when it is configured (`TableSink.configure()`) before 
it emits a Table. 

In the current state, a `TableSink` is configured to emit data according to 
the query result, i.e, the schema of the emitted table depends on the query 
result. However, for `INSERT INTO` the table schema is defined beforehand and 
Calcite validates that the schema of the query is compliant with the schema of 
the table to write to. 

I think we should change the `TableSink` interface, but right now I don't 
think we should use `TableSink` like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-07-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r127448060
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,24 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
--- End diff --

I think an example query would be good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-05-05 Thread lincoln-lil
GitHub user lincoln-lil opened a pull request:

https://github.com/apache/flink/pull/3829

[FLINK-6442] [table] Extend TableAPI Support Sink Table Registration …

Support Sink Table Registration and ‘insert into’ Clause in SQL:
1. support registering a sink table (like source table registration, and 
will do validation according to the registered table)
2. support ‘insert into’ clause in SQL
3. do not support insert partial columns of the target table due to table 
columns haven’t nullable property definition for now

Tried to implement translating the whole SQL string via converter rules, 
while in Flink,  sink operation has no output but all RelNodes' translation 
should return a DataSet/DataStream, and Calcite's TableModify defines output 
for DML(INSERT,DELETE,UPDATE) is a RowCount column of BigInt type.
So separate source translation if DML insert,  this a workaround since we 
have no conclusion for the output type definition of sink operation(compare to 
traditional SQL insert).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lincoln-lil/flink FLINK-6442

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3829


commit 4aec1f6d6167ab7f0be9ace3d9f51a877d21ef46
Author: lincoln-lil 
Date:   2017-05-04T09:52:34Z

[FLINK-6442] [table] Extend TableAPI Support Sink Table Registration and 
‘insert into’ Clause in SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---