[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6201


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-08 Thread suez1224
GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/6201

[FLINK-8866][Table API & SQL] Add support for unified table sink 
instantiation

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Add interfaces to support unified table sink configuration and 
instantiation. Consolidate table source and table sink configuration and 
instantiation.


## Brief change log

- Consolidate table sink and table source instantiation with 
TableConnectorFactory{Service}.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for testing registering table source and sink 
tables under the same name. 
  - *Added integration tests for testing insert into command in sql client.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8866-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6201


commit d436041fda022f405fa3a1d497b95d2ff0934ce5
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

- Add unified table sink instantiation.
- Consolidate table sink and table source instantiation.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

commit d70d033abec0806f1723b18f7bcbab1f60ec7280
Author: Shuyi Chen 
Date:   2018-06-28T18:30:21Z

comment fixes

commit 0649359106ddfa003c44e3d0221f7f52ac507593
Author: Shuyi Chen 
Date:   2018-07-06T23:31:05Z

refactor:
1) add TableFactoryDiscoverable trait
2) add util for handling rowtime/proctime for table schema and unittests




---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-08 Thread suez1224
Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/6201


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-07 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200806113
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
-  case _: StreamTableSourceTable[_] => true
-  case _: BatchTableSourceTable[_] => false
+val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
+  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
+case _: StreamTableSourceTable[_] => true
--- End diff --

good catch.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-07 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200806045
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 ---
@@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor(
 CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
+  private var numFiles: Option[Int] = None
--- End diff --

This is for the CsvTableSinkFactory.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-07 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200805902
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
+   }
+   if (this.tables.containsKey(tableName)) {
+   throw new SqlClientException("Duplicate table 
name '" + tableName + "'.");
--- End diff --

the current implementation allow only source and sink in one table. 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-07 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200805881
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
--- End diff --

good catch. thanks


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200523824
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Huge +1. My understanding is this will be the overall class to hold a table 
source, sink or both. `TableSourceSinkTable` seems redundant. 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200522491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -160,10 +173,34 @@ abstract class BatchTableEnvironment(
   throw new TableException("Same number of field names and types 
required.")
 }
 
-tableSink match {
+val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+registerTableSinkInternal(name, configuredSink)
+  }
+
+  def registerTableSink(name: String, configuredSink: TableSink[_]): Unit 
= {
--- End diff --

could probably move this to based class `TableEnvironment` ?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

also just `TableFactoryService` ?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200521017
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
--- End diff --

missing `both` ?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

+1 Should be able to unify


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

+1 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200523261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

+1 I think the most baffling point I have read until this point was the 
`Table*Connector*Factory` part :-)


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200521222
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
+   }
+   if (this.tables.containsKey(tableName)) {
+   throw new SqlClientException("Duplicate table 
name '" + tableName + "'.");
--- End diff --

if only `"source"` and `"sink"` is allowed, should we allow the same name 
but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1", 
"type": "sink"}` co-exist? this is actually following up with the previous 
comment. I think we just need one, either should work.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-04 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200045769
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

@suez1224 Actually, I liked `CREATE TABLE` because it is closer to SQL. The 
reason why I proposed `TableFactory` was because the factory does much more 
than just constructing a connector. It also performs schema validation, format 
discovery and so on. 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-03 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199951101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Sounds good, also, I've updated the DDL design doc to call it TABLE 
CONNECTOR, which I thin k it is more clear.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-29 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199066851
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object MemoryTableSourceSinkUtil {
+  var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def clear = {
+MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(tableSchema: TableSchema,
+returnType: TypeInformation[Row],
--- End diff --

done


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-29 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199066578
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

Yes, let me put it into a separate PR. StatusResult make sense to me.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-29 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r199065915
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

@twalthr, for sink only table, I dont think the user need to define any 
rowtimes on it, since it will never use as a source. For table as both source 
and sink, when registering it as sink, I think we only need to take care of the 
'from-field' columns, since they map to actual data fields in the table. For 
`proctime` and 'from-source' columns, we can just ignore them when building the 
sink schema. Maybe, we should have some helper method for building the schema 
for source and sink separately. Please correct me if I missed something here. 
What do you think?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198678147
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector
+
+import org.apache.flink.table.api.{NoMatchingTableConnectorException, 
TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.TableDescriptorValidator
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSinkFactoryServiceTest {
+  @Test
+  def testValidProperties(): Unit = {
+val props = properties()
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableConnectorException])
+  def testInvalidContext(): Unit = {
+val props = properties()
+props.put(CONNECTOR_TYPE, "FAIL")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+val props = properties()
+props.put(CONNECTOR_PROPERTY_VERSION, "2")
+// the table source should still be found
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+val props = properties()
+props.put("format.path_new", "/new/path")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+val props = properties()
+props.put("failing", "true")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+val properties = mutable.Map[String, String]()
+properties.put(TableDescriptorValidator.TABLE_TYPE,
+  TableDescriptorValidator.TABLE_TYPE_VALUE_SINK)
+properties.put(CONNECTOR_TYPE, "test")
--- End diff --

good point, done


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198677922
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
+*/
+  def tableType() : String
--- End diff --

sounds good.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198677762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
--- End diff --

Done


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198677754
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
--- End diff --

done


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215478
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object MemoryTableSourceSinkUtil {
+  var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def clear = {
+MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(tableSchema: TableSchema,
+returnType: TypeInformation[Row],
--- End diff --

We usually intend differently. Take 
`org.apache.flink.table.codegen.CodeGenerator` as an example.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
+  : T = {
 
 val properties = new DescriptorProperties()
 descriptor.addProperties(properties)
-findAndCreateTableSource(properties.asMap.asScala.toMap, classLoader)
+findAndCreateTableConnector(properties.asMap.asScala.toMap, 
classLoader)
   }
 
-  def findAndCreateTableSource(properties: Map[String, String]): 
TableSource[_] = {
-findAndCreateTableSource(properties, null)
+  def findAndCreateTableConnector(properties: Map[String, String]): T = {
+findAndCreateTableConnector(properties, null)
   }
 
-  def findAndCreateTableSource(
-  properties: Map[String, String],
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(properties: Map[String, String],
--- End diff --

This can fit into one line.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198160808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Call this `ExternalTable` or `ConnectorTable`?

Maybe we can add more helper methods here like `hasSource()`, `getSource` 
in order to reduce the many pattern matching changes in this PR.

I'm also wondering if we need the separate classes `TableSinkTable` and 
`TableSourceTable` anymore. We could store `TableSink` and `TableSource` 
directly in `UnifiedTable` 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
 throw new ValidationException(
   s"Table factory '${factory.getClass.getCanonicalName}' does not 
support the " +
-  s"property '$k'. Supported properties are: \n" +
-  
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
+s"property '$k'. Supported properties are: \n" +
+
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
   }
 }
 
-// create the table source
+// create the table connector
 try {
   factory.create(properties.asJava)
 } catch {
   case t: Throwable =>
 throw new TableException(
-  s"Table source factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
+  s"Table connector factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
--- End diff --

There are more exception messages in this class that need an update.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198223665
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

I think even a detached job needs to return a result. Otherwise you cannot 
be sure if the job has been submitted or not. E.g., the cluster might not be 
reachable. In any case, every created thread should be managed by the result 
store. So we should have a similar architecture as for queries. Maybe instead 
of `CollectStreamResult` a `StatusResult`. Maybe we should do the SQL Client 
changes in a separate PR?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
--- End diff --

remove comma


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146395
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
--- End diff --

The table type must not be part of the supported properties as it is 
defined in a separate method in the `TableConnectorFactory` interface.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198151603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Do we need a `TableSinkDescriptor`? Can't we unify `TableSinkDescriptor` 
and `TableSourceDescriptor` to `TableDescriptor`?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198148852
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Please add a comment to this method as it will be public in a Java API 
(similar to TableSourceDescriptor).


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198136187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
+*/
+  def tableType() : String
--- End diff --

Rename to `getType()`?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135589
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Actually, we could also simplify this and call it `TableFactory`. What do 
you think? We also call `CREATE TABLE` not `CREATE TABLE CONNECTOR` in SQL.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198147548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 ---
@@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor(
 CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
+  private var numFiles: Option[Int] = None
--- End diff --

Please remove these unrelated changes and open a separate issue for them.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198154676
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
-  case _: StreamTableSourceTable[_] => true
-  case _: BatchTableSourceTable[_] => false
+val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
+  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
+case _: StreamTableSourceTable[_] => true
--- End diff --

The pattern are incorrect since the method returns an option.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
--- End diff --

This can fit into one line.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

Make abstract?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215161
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase {
   "1,1,Hi,1970-01-01 00:00:00.001",
   "2,2,Hello,1970-01-01 00:00:00.002",
   "3,2,Hello world,1970-01-01 00:00:00.002")
-assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+assertEquals(expected.sorted, 
MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted)
+  }
+
+  @Test
+  def testWriteReadTableSourceSink(): Unit = {
+var env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+var tEnv = TableEnvironment.getTableEnvironment(env)
+MemoryTableSourceSinkUtil.clear
+
+val t = StreamTestData.getSmall3TupleDataStream(env)
+  .assignAscendingTimestamps(x => x._2)
+  .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+tEnv.registerTable("sourceTable", t)
+
+val fieldNames = Array("a", "e", "f", "t")
+val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+  .asInstanceOf[Array[TypeInformation[_]]]
+
+val tableSchema = new TableSchema(
+  Array("a", "e", "f", "t", "rowtime", "proctime"),
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP,
+Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+val returnType = new RowTypeInfo(
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+.asInstanceOf[Array[TypeInformation[_]]],
+  Array("a", "e", "f", "t"))
+tEnv.registerTableSource("targetTable", new 
MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+  tableSchema, returnType, "rowtime", 3))
+tEnv.registerTableSink("targetTable",
+  new 
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, 
fieldTypes))
+
+tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
sourceTable")
+tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable")
--- End diff --

I think we need more test cases about how we handle the time attributes for 
`both` table types. Maybe not only ITCases but also unit tests. The `configure` 
method is an internal method that should not be called here.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198142339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -98,10 +98,12 @@ object TableSourceFactoryService extends Logging {
 plainContext.remove(STATISTICS_PROPERTY_VERSION)
 
 // check if required context is met
-if (plainContext.forall(e => properties.contains(e._1) && 
properties(e._1) == e._2)) {
+if 
(properties.get(TableDescriptorValidator.TABLE_TYPE).get.equals(factory.tableType())
 &&
--- End diff --

Consider cases where the type has not been set. Btw 
`properties.get(TableDescriptorValidator.TABLE_TYPE).get` can be simplified to 
`properties(TableDescriptorValidator.TABLE_TYPE)`. It might be useful to enable 
more warnings in your IDE.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198207327
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector
+
+import org.apache.flink.table.api.{NoMatchingTableConnectorException, 
TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.TableDescriptorValidator
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSinkFactoryServiceTest {
+  @Test
+  def testValidProperties(): Unit = {
+val props = properties()
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableConnectorException])
+  def testInvalidContext(): Unit = {
+val props = properties()
+props.put(CONNECTOR_TYPE, "FAIL")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+val props = properties()
+props.put(CONNECTOR_PROPERTY_VERSION, "2")
+// the table source should still be found
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+val props = properties()
+props.put("format.path_new", "/new/path")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+val props = properties()
+props.put("failing", "true")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+val properties = mutable.Map[String, String]()
+properties.put(TableDescriptorValidator.TABLE_TYPE,
+  TableDescriptorValidator.TABLE_TYPE_VALUE_SINK)
+properties.put(CONNECTOR_TYPE, "test")
--- End diff --

I would use strings here for everything (not the variables). This allows 
tests to fail if we refactor one of the properties.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198134945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
--- End diff --

Add the updated comment again.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198219005
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

We also need to ship the query config here.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198218238
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

Wrap it into a try-catch similar to 
org.apache.flink.table.client.gateway.local.LocalExecutor#createTable.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-22 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/6201

[FLINK-8866][Table API & SQL] Add support for unified table sink 
instantiation

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

Add interfaces to support unified table sink configuration and 
instantiation. Consolidate table source and table sink configuration and 
instantiation.


## Brief change log

- Consolidate table sink and table source instantiation with 
TableConnectorFactory{Service}.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for testing registering table source and sink 
tables under the same name. 
  - *Added integration tests for testing insert into command in sql client.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-8866-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6201


commit fb1a021f86668f326f6322b52a324da8e6eb1f47
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

- Add unified table sink instantiation.
- Consolidate table sink and table source instantiation.
- Add support to register a Calcite table with both tableSource and 
tableSink.
- Add Insert command support in SQL client.
- Add CsvTableSinkFactory.




---