[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544495#comment-16544495
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6201


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536522#comment-16536522
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/6201


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536523#comment-16536523
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/6201

[FLINK-8866][Table API & SQL] Add support for unified table sink 
instantiation

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Add interfaces to support unified table sink configuration and 
instantiation. Consolidate table source and table sink configuration and 
instantiation.


## Brief change log

- Consolidate table sink and table source instantiation with 
TableConnectorFactory{Service}.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for testing registering table source and sink 
tables under the same name. 
  - *Added integration tests for testing insert into command in sql client.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8866-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6201


commit d436041fda022f405fa3a1d497b95d2ff0934ce5
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

- Add unified table sink instantiation.
- Consolidate table sink and table source instantiation.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

commit d70d033abec0806f1723b18f7bcbab1f60ec7280
Author: Shuyi Chen 
Date:   2018-06-28T18:30:21Z

comment fixes

commit 0649359106ddfa003c44e3d0221f7f52ac507593
Author: Shuyi Chen 
Date:   2018-07-06T23:31:05Z

refactor:
1) add TableFactoryDiscoverable trait
2) add util for handling rowtime/proctime for table schema and unittests




> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535682#comment-16535682
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200806113
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
-  case _: StreamTableSourceTable[_] => true
-  case _: BatchTableSourceTable[_] => false
+val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
+  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
+case _: StreamTableSourceTable[_] => true
--- End diff --

good catch.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535680#comment-16535680
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200806045
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 ---
@@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor(
 CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
+  private var numFiles: Option[Int] = None
--- End diff --

This is for the CsvTableSinkFactory.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535678#comment-16535678
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200805902
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
+   }
+   if (this.tables.containsKey(tableName)) {
+   throw new SqlClientException("Duplicate table 
name '" + tableName + "'.");
--- End diff --

the current implementation allow only source and sink in one table. 


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> 

[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535676#comment-16535676
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200805881
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
--- End diff --

good catch. thanks


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534329#comment-16534329
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200521222
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
+   }
+   if (this.tables.containsKey(tableName)) {
+   throw new SqlClientException("Duplicate table 
name '" + tableName + "'.");
--- End diff --

if only `"source"` and `"sink"` is allowed, should we allow the same name 
but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1", 
"type": "sink"}` co-exist? this is actually following up with the previous 
comment. I think we just need one, either should work.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are 

[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534332#comment-16534332
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

+1 Should be able to unify


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534334#comment-16534334
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

+1 


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534328#comment-16534328
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200521017
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
--- End diff --

missing `both` ?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534330#comment-16534330
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200522491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -160,10 +173,34 @@ abstract class BatchTableEnvironment(
   throw new TableException("Same number of field names and types 
required.")
 }
 
-tableSink match {
+val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+registerTableSinkInternal(name, configuredSink)
+  }
+
+  def registerTableSink(name: String, configuredSink: TableSink[_]): Unit 
= {
--- End diff --

could probably move this to based class `TableEnvironment` ?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534333#comment-16534333
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200523824
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Huge +1. My understanding is this will be the overall class to hold a table 
source, sink or both. `TableSourceSinkTable` seems redundant. 


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534331#comment-16534331
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

also just `TableFactoryService` ?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534327#comment-16534327
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200523261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

+1 I think the most baffling point I have read until this point was the 
`Table*Connector*Factory` part :-)


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532428#comment-16532428
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200045769
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

@suez1224 Actually, I liked `CREATE TABLE` because it is closer to SQL. The 
reason why I proposed `TableFactory` was because the factory does much more 
than just constructing a connector. It also performs schema validation, format 
discovery and so on. 


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531930#comment-16531930
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199951101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Sounds good, also, I've updated the DDL design doc to call it TABLE 
CONNECTOR, which I thin k it is more clear.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531920#comment-16531920
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@twalthr @fhueske sounds good to me. We can do that in a follow-up issue 
for `from-source`, and we will not support `from-source` in this PR.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531237#comment-16531237
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6201
  
I agree with @fhueske. Let's do the `from-source` in a follow-up issue. I 
will open a PR soon for FLINK-8558 which separates connector and format. For 
this I also introduced a method `KafkaTableSource#supportsKafkaTimestamps`. The 
`KafkaTableFactory` can read this property and throw an exception accordingly.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531114#comment-16531114
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6201
  
Hi @suez1224, that sounds good overall. :-)

A few comments:

- I would not add a user-facing property `connector.support-timestamp` 
because a user chooses that by choosing the connector type. Whether the 
connector supports writing a system timestamp can be an internal 
field/annotation/interface of the `TableSink` that is generated from the 
properties.
- Copying the timestamp to the StreamRecord timestamp field can be done 
with a process function. Actually, we do that already when converting a Table 
into a DataStream. Setting the flag in the Kafka TableSink should be easy.
- Not sure if `from-source` needs to be supported by the initial version. 
We could just implement `from-field` for now, and handle `from-source` as a 
follow up issue. Since we are approaching feature freeze, I think this might be 
a good idea at this point.

What do you think?
Fabian


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530939#comment-16530939
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@fhueske @twalthr thanks for the comments. In `from-source`, the only 
system i know of is Kafka10 or Kafka11, which support writing record along with 
timestamp. To support `from-source` in table sink, I think we can do the 
following:
1) add a connector property, e.g. connector.support-timestamp. Only if 
connector.support-timestamp is true, we will allow the sink table schema to 
contain a field with rowtime type `from-source`. Otherwise, an exception will 
be thrown.
2) if the condition in 1) is satisfied, we will create corresponding 
rowtime field in the sink table schema with type LONG, in 
TableEnvironment.insertInto(), we will validate the sink schema against the 
insertion source. Also, in the TableSink.emitDataStream() implementation, we 
will need to insert an timestamp assigner operator to set 
StreamRecord.timestamp (should we reuse existing interface, or create a new 
timestampInserter interface?) and remove the extra rowtime field from 
StreamRecord.value before we emit the datastream to the sink. (for 
kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))

Please correct me if I missed something here. What do you think?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527653#comment-16527653
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6201
  
Hi, I think timestamp fields of source-sink tables should be handled as 
follows when emitting the table:
- `proc-time`: ignore
- `from-field`: simply write out the timestamp as part of the row.
- `from-source`: write the timestamp separately to the system and remove it 
from the row. This only works if we can set the timestamp to the sink system. 
If the system sets the ingestion timestamp by it own, i.e., not the actual 
value, rows would contain different timestamps when they are ingested. If the 
sink system does not support to set a timestamp, we cannot allow such a table 
definition.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527379#comment-16527379
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6201
  
@suez1224 Yes sounds good to me. Only `from-field` timestamps matter right 
now. 

We should also think of the opposite of a timestamps extractor (timestamp 
inserter) for cases where the rowtime needs some preprocessing (like e.g. 
concatenation of a DATE and TIME column), but we can deal with such cases in a 
follow up issue.

A helper method would be useful. We already have something similar in 
`SchemaValidator` for schema derivation.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527250#comment-16527250
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199066851
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object MemoryTableSourceSinkUtil {
+  var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def clear = {
+MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(tableSchema: TableSchema,
+returnType: TypeInformation[Row],
--- End diff --

done


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527249#comment-16527249
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199066578
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

Yes, let me put it into a separate PR. StatusResult make sense to me.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527247#comment-16527247
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/6201
  
@twalthr , for sink only table, I dont think the user need to define any 
rowtimes on it, since it will never use as a source. For table as both source 
and sink, when registering it as sink, I think we only need to take care of the 
'from-field' columns, since they map to actual data fields in the table. For 
proctime and 'from-source' columns, we can just ignore them when building the 
sink schema. Maybe, we should have some helper method for building the schema 
for source and sink separately. Please correct me if I missed something here. 
What do you think?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527246#comment-16527246
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199065915
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

@twalthr, for sink only table, I dont think the user need to define any 
rowtimes on it, since it will never use as a source. For table as both source 
and sink, when registering it as sink, I think we only need to take care of the 
'from-field' columns, since they map to actual data fields in the table. For 
`proctime` and 'from-source' columns, we can just ignore them when building the 
sink schema. Maybe, we should have some helper method for building the schema 
for source and sink separately. Please correct me if I missed something here. 
What do you think?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525764#comment-16525764
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198678147
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector
+
+import org.apache.flink.table.api.{NoMatchingTableConnectorException, 
TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.TableDescriptorValidator
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSinkFactoryServiceTest {
+  @Test
+  def testValidProperties(): Unit = {
+val props = properties()
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableConnectorException])
+  def testInvalidContext(): Unit = {
+val props = properties()
+props.put(CONNECTOR_TYPE, "FAIL")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+val props = properties()
+props.put(CONNECTOR_PROPERTY_VERSION, "2")
+// the table source should still be found
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+val props = properties()
+props.put("format.path_new", "/new/path")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+val props = properties()
+props.put("failing", "true")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+val properties = mutable.Map[String, String]()
+properties.put(TableDescriptorValidator.TABLE_TYPE,
+  TableDescriptorValidator.TABLE_TYPE_VALUE_SINK)
+properties.put(CONNECTOR_TYPE, "test")
--- End diff --

good point, done


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525761#comment-16525761
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198677922
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
+*/
+  def tableType() : String
--- End diff --

sounds good.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525755#comment-16525755
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198677754
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
--- End diff --

done


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525757#comment-16525757
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198677762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
--- End diff --

Done


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524016#comment-16524016
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198218238
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

Wrap it into a try-catch similar to 
org.apache.flink.table.client.gateway.local.LocalExecutor#createTable.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523996#comment-16523996
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135204
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
--- End diff --

Use plural `connectors`


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523997#comment-16523997
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
+  : T = {
 
 val properties = new DescriptorProperties()
 descriptor.addProperties(properties)
-findAndCreateTableSource(properties.asMap.asScala.toMap, classLoader)
+findAndCreateTableConnector(properties.asMap.asScala.toMap, 
classLoader)
   }
 
-  def findAndCreateTableSource(properties: Map[String, String]): 
TableSource[_] = {
-findAndCreateTableSource(properties, null)
+  def findAndCreateTableConnector(properties: Map[String, String]): T = {
+findAndCreateTableConnector(properties, null)
   }
 
-  def findAndCreateTableSource(
-  properties: Map[String, String],
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(properties: Map[String, String],
--- End diff --

This can fit into one line.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add 

[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524000#comment-16524000
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
 throw new ValidationException(
   s"Table factory '${factory.getClass.getCanonicalName}' does not 
support the " +
-  s"property '$k'. Supported properties are: \n" +
-  
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
+s"property '$k'. Supported properties are: \n" +
+
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
   }
 }
 
-// create the table source
+// create the table connector
 try {
   factory.create(properties.asJava)
 } catch {
   case t: Throwable =>
 throw new TableException(
-  s"Table source factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
+  s"Table connector factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
--- End diff --

There are more exception messages in this class that need an update.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524009#comment-16524009
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215478
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object MemoryTableSourceSinkUtil {
+  var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def clear = {
+MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(tableSchema: TableSchema,
+returnType: TypeInformation[Row],
--- End diff --

We usually intend differently. Take 
`org.apache.flink.table.codegen.CodeGenerator` as an example.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523998#comment-16523998
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198136187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
+*/
+  def tableType() : String
--- End diff --

Rename to `getType()`?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524002#comment-16524002
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198148852
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Please add a comment to this method as it will be public in a Java API 
(similar to TableSourceDescriptor).


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524008#comment-16524008
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198151603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Do we need a `TableSinkDescriptor`? Can't we unify `TableSinkDescriptor` 
and `TableSourceDescriptor` to `TableDescriptor`?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524015#comment-16524015
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198223665
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

I think even a detached job needs to return a result. Otherwise you cannot 
be sure if the job has been submitted or not. E.g., the cluster might not be 
reachable. In any case, every created thread should be managed by the result 
store. So we should have a similar architecture as for queries. Maybe instead 
of `CollectStreamResult` a `StatusResult`. Maybe we should do the SQL Client 
changes in a separate PR?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524011#comment-16524011
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215161
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase {
   "1,1,Hi,1970-01-01 00:00:00.001",
   "2,2,Hello,1970-01-01 00:00:00.002",
   "3,2,Hello world,1970-01-01 00:00:00.002")
-assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+assertEquals(expected.sorted, 
MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted)
+  }
+
+  @Test
+  def testWriteReadTableSourceSink(): Unit = {
+var env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+var tEnv = TableEnvironment.getTableEnvironment(env)
+MemoryTableSourceSinkUtil.clear
+
+val t = StreamTestData.getSmall3TupleDataStream(env)
+  .assignAscendingTimestamps(x => x._2)
+  .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+tEnv.registerTable("sourceTable", t)
+
+val fieldNames = Array("a", "e", "f", "t")
+val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+  .asInstanceOf[Array[TypeInformation[_]]]
+
+val tableSchema = new TableSchema(
+  Array("a", "e", "f", "t", "rowtime", "proctime"),
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP,
+Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+val returnType = new RowTypeInfo(
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+.asInstanceOf[Array[TypeInformation[_]]],
+  Array("a", "e", "f", "t"))
+tEnv.registerTableSource("targetTable", new 
MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+  tableSchema, returnType, "rowtime", 3))
+tEnv.registerTableSink("targetTable",
+  new 
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, 
fieldTypes))
+
+tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
sourceTable")
+tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable")
--- End diff --

I think we need more test cases about how we handle the time attributes for 
`both` table types. Maybe not only ITCases but also unit tests. The `configure` 
method is an internal method that should not be called here.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524005#comment-16524005
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
--- End diff --

This can fit into one line.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524013#comment-16524013
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198154676
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
-  case _: StreamTableSourceTable[_] => true
-  case _: BatchTableSourceTable[_] => false
+val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
+  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
+case _: StreamTableSourceTable[_] => true
--- End diff --

The pattern are incorrect since the method returns an option.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524012#comment-16524012
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198147548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 ---
@@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor(
 CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
+  private var numFiles: Option[Int] = None
--- End diff --

Please remove these unrelated changes and open a separate issue for them.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523999#comment-16523999
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146395
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
--- End diff --

The table type must not be part of the supported properties as it is 
defined in a separate method in the `TableConnectorFactory` interface.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524003#comment-16524003
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
--- End diff --

remove comma


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524007#comment-16524007
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198207327
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector
+
+import org.apache.flink.table.api.{NoMatchingTableConnectorException, 
TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.TableDescriptorValidator
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSinkFactoryServiceTest {
+  @Test
+  def testValidProperties(): Unit = {
+val props = properties()
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableConnectorException])
+  def testInvalidContext(): Unit = {
+val props = properties()
+props.put(CONNECTOR_TYPE, "FAIL")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+val props = properties()
+props.put(CONNECTOR_PROPERTY_VERSION, "2")
+// the table source should still be found
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+val props = properties()
+props.put("format.path_new", "/new/path")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+val props = properties()
+props.put("failing", "true")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+val properties = mutable.Map[String, String]()
+properties.put(TableDescriptorValidator.TABLE_TYPE,
+  TableDescriptorValidator.TABLE_TYPE_VALUE_SINK)
+properties.put(CONNECTOR_TYPE, "test")
--- End diff --

I would use strings here for everything (not the variables). This allows 
tests to fail if we refactor one of the properties.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524010#comment-16524010
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198160808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Call this `ExternalTable` or `ConnectorTable`?

Maybe we can add more helper methods here like `hasSource()`, `getSource` 
in order to reduce the many pattern matching changes in this PR.

I'm also wondering if we need the separate classes `TableSinkTable` and 
`TableSourceTable` anymore. We could store `TableSink` and `TableSource` 
directly in `UnifiedTable` 


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524001#comment-16524001
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198134945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
--- End diff --

Add the updated comment again.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524014#comment-16524014
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198219005
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

We also need to ship the query config here.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524004#comment-16524004
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198142339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -98,10 +98,12 @@ object TableSourceFactoryService extends Logging {
 plainContext.remove(STATISTICS_PROPERTY_VERSION)
 
 // check if required context is met
-if (plainContext.forall(e => properties.contains(e._1) && 
properties(e._1) == e._2)) {
+if 
(properties.get(TableDescriptorValidator.TABLE_TYPE).get.equals(factory.tableType())
 &&
--- End diff --

Consider cases where the type has not been set. Btw 
`properties.get(TableDescriptorValidator.TABLE_TYPE).get` can be simplified to 
`properties(TableDescriptorValidator.TABLE_TYPE)`. It might be useful to enable 
more warnings in your IDE.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524006#comment-16524006
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

Make abstract?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523995#comment-16523995
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135589
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Actually, we could also simplify this and call it `TableFactory`. What do 
you think? We also call `CREATE TABLE` not `CREATE TABLE CONNECTOR` in SQL.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520057#comment-16520057
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/6201

[FLINK-8866][Table API & SQL] Add support for unified table sink 
instantiation

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Add interfaces to support unified table sink configuration and 
instantiation. Consolidate table source and table sink configuration and 
instantiation.


## Brief change log

- Consolidate table sink and table source instantiation with 
TableConnectorFactory{Service}.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for testing registering table source and sink 
tables under the same name. 
  - *Added integration tests for testing insert into command in sql client.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8866-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6201


commit fb1a021f86668f326f6322b52a324da8e6eb1f47
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

- Add unified table sink instantiation.
- Consolidate table sink and table source instantiation.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.




> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-16 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477127#comment-16477127
 ] 

Fabian Hueske commented on FLINK-8866:
--

Regarding (3), originally, {{TableSink}} was designed to be flexible with 
regard to the output schema. A query that should be emitted was planned by the 
optimizer. Based on the resulting type, the TableSink was internally configured 
for the result schema. The configuration method produces a configured copy of 
the sink that is used to emit the result. So, the TableSink was not known to 
Calcite and only handled by the TableEnvironment.

When we added support for {{INSERT INTO}} this didn't work anymore because 
Calcite validates that the schema of the target table is compatible with the 
result schema of the SELECT query. Hence, we added the field names and types to 
the registration, configure the TableSink, and register the newly configured 
TableSink in Calcite's catalog. By doing it this way, we did not have to change 
the interface of the TableSink which did not only mean backwards compatibility 
but also that all TableSinks can be used in either way.

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16475416#comment-16475416
 ] 

Timo Walther commented on FLINK-8866:
-

I agree with (1), we could definitely reduce the number of copied code. A 
design doc would be very helpful for discussing (2). The need for field names 
and types should depend on the connector/format or should serve as a safety 
barrier before writing.

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-14 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16475177#comment-16475177
 ] 

Rong Rong commented on FLINK-8866:
--

+1 on the (1) and (2) point, I found like huge trunk of component being copied 
when trying create TableSink[Factory/FactoryService] component with current 
architecture. 

Regrading (3), I think there might be some SinkFunction where fieldName and 
fieldType is necessary to validate during initialization of the Sink function 
(such as JDBC sink, where the underlying JDBC driver is loaded in runtime I 
believe). What do you think should we still consider having them as *optional* 
part of the configuration?

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-14 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474801#comment-16474801
 ] 

Shuyi Chen commented on FLINK-8866:
---

Hi [~walterddr], [~twalthr], [~fhueske], thanks a lot for the comments. I think 
there are a few challenges for this JIRA,

1) there can be a lot of duplicate code dealing with the unified table sink 
instantiation if we do it in the same way as 
TableSourceFactory/TableSourceFactoryService. So we should try to 
refactor/redesign it to make it cleaner.

2) to support a table which can be both source and sink, we need to have a 
unified interface at least when interacting with Calcite, so the same table 
name can be used for the source and sink in SQL.

3) when registering tableSinks, the current registerTableSink interface took 
additional parameters _fieldName_ and _fieldTypes_, which I dont think it's 
necessary and add complexity when integrating with SQL DDL and SQL client.

I am experimenting the changes needed in my local branch, and writing a design 
doc. Would love to share the design doc soon when I think it's ready.

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-14 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474463#comment-16474463
 ] 

Rong Rong commented on FLINK-8866:
--

I think they should be designed to use in both source and sink cases. It 
doesn't really make sense to separate them, at least not in [~twalthr]'s Kafka 
example and I don't really think there's any difference for others either. The 
only thing I can think of is security/authorization concern: E.g. whether 
sharing configurations for real-only access and write access in the same 
location is safe or not. [~suez1224] what do you think?

I haven't implement the "both" part, this diff is merely trying to clear some 
questions I had for implementing FLINK-8880. Also, there's some components 
which I think can definitely be unified in Table API level, such as the 
Source/Sink Factory service. It can make the APIs more unified. What do you 
guys think?

--
Rong

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-14 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474115#comment-16474115
 ] 

Fabian Hueske commented on FLINK-8866:
--

I think we should design the system properties in a way that they can be used 
for both, sources and sinks. For most connectors, source and sink properties 
should not make a difference.

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-14 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474096#comment-16474096
 ] 

Timo Walther commented on FLINK-8866:
-

Hi [~walterddr], the changes for FLINK-8866 look good to me. I wonder whether 
this design also works for concrete implementations of connectors. E.g., if we 
use a table type "both", would a Kafka table source and sink require the same 
properties as a regular "source" or slightly different properties.

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-11 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472571#comment-16472571
 ] 

Rong Rong commented on FLINK-8866:
--

Hi [~suez1224]

I was working on FLINK-8880 and I thought it will be great to have some 
understanding on how this JIRA is going to be implemented. Since some of the 
functionalities in the description is already implemented in FLINK-9059. 

I had a preliminary version that only supports "source" and "sink" and was 
wondering if this could be part of the solution: 
https://github.com/apache/flink/compare/master...walterddr:FLINK-8866

Please kindly take a look, many thanks 

Rong

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)