[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544495#comment-16544495 ] ASF GitHub Bot commented on FLINK-8866: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6201 > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536522#comment-16536522 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 closed the pull request at: https://github.com/apache/flink/pull/6201 > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536523#comment-16536523 ] ASF GitHub Bot commented on FLINK-8866: --- GitHub user suez1224 reopened a pull request: https://github.com/apache/flink/pull/6201 [FLINK-8866][Table API & SQL] Add support for unified table sink instantiation **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Add interfaces to support unified table sink configuration and instantiation. Consolidate table source and table sink configuration and instantiation. ## Brief change log - Consolidate table sink and table source instantiation with TableConnectorFactory{Service}. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for testing registering table source and sink tables under the same name. - *Added integration tests for testing insert into command in sql client. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink FLINK-8866-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6201 commit d436041fda022f405fa3a1d497b95d2ff0934ce5 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z - Add unified table sink instantiation. - Consolidate table sink and table source instantiation. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. commit d70d033abec0806f1723b18f7bcbab1f60ec7280 Author: Shuyi Chen Date: 2018-06-28T18:30:21Z comment fixes commit 0649359106ddfa003c44e3d0221f7f52ac507593 Author: Shuyi Chen Date: 2018-07-06T23:31:05Z refactor: 1) add TableFactoryDiscoverable trait 2) add util for handling rowtime/proctime for table schema and unittests > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535682#comment-16535682 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200806113 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala --- @@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] -val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { - case _: StreamTableSourceTable[_] => true - case _: BatchTableSourceTable[_] => false +val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { + case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { +case _: StreamTableSourceTable[_] => true --- End diff -- good catch. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535680#comment-16535680 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200806045 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala --- @@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor( CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) { private var path: Option[String] = None + private var numFiles: Option[Int] = None --- End diff -- This is for the CsvTableSinkFactory. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535678#comment-16535678 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200805902 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); + } + if (this.tables.containsKey(tableName)) { + throw new SqlClientException("Duplicate table name '" + tableName + "'."); --- End diff -- the current implementation allow only source and sink in one table. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) >
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535676#comment-16535676 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200805881 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); --- End diff -- good catch. thanks > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534329#comment-16534329 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200521222 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); + } + if (this.tables.containsKey(tableName)) { + throw new SqlClientException("Duplicate table name '" + tableName + "'."); --- End diff -- if only `"source"` and `"sink"` is allowed, should we allow the same name but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1", "type": "sink"}` co-exist? this is actually following up with the previous comment. I think we just need one, either should work. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534332#comment-16534332 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200524249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Common class for all descriptors describing a table sink. + */ +abstract class TableSinkDescriptor extends TableDescriptor { + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- +1 Should be able to unify > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534334#comment-16534334 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200524110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { --- End diff -- +1 > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534328#comment-16534328 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200521017 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); --- End diff -- missing `both` ? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534330#comment-16534330 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200522491 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -160,10 +173,34 @@ abstract class BatchTableEnvironment( throw new TableException("Same number of field names and types required.") } -tableSink match { +val configuredSink = tableSink.configure(fieldNames, fieldTypes) +registerTableSinkInternal(name, configuredSink) + } + + def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = { --- End diff -- could probably move this to based class `TableEnvironment` ? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534333#comment-16534333 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200523824 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable + +class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]], --- End diff -- Huge +1. My understanding is this will be the overall class to hold a table source, sink or both. `TableSourceSinkTable` seems redundant. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534331#comment-16534331 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200524149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { --- End diff -- also just `TableFactoryService` ? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534327#comment-16534327 ] ASF GitHub Bot commented on FLINK-8866: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200523261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- +1 I think the most baffling point I have read until this point was the `Table*Connector*Factory` part :-) > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532428#comment-16532428 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200045769 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- @suez1224 Actually, I liked `CREATE TABLE` because it is closer to SQL. The reason why I proposed `TableFactory` was because the factory does much more than just constructing a connector. It also performs schema validation, format discovery and so on. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531930#comment-16531930 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r199951101 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- Sounds good, also, I've updated the DDL design doc to call it TABLE CONNECTOR, which I thin k it is more clear. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531920#comment-16531920 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @twalthr @fhueske sounds good to me. We can do that in a follow-up issue for `from-source`, and we will not support `from-source` in this PR. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531237#comment-16531237 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6201 I agree with @fhueske. Let's do the `from-source` in a follow-up issue. I will open a PR soon for FLINK-8558 which separates connector and format. For this I also introduced a method `KafkaTableSource#supportsKafkaTimestamps`. The `KafkaTableFactory` can read this property and throw an exception accordingly. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16531114#comment-16531114 ] ASF GitHub Bot commented on FLINK-8866: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6201 Hi @suez1224, that sounds good overall. :-) A few comments: - I would not add a user-facing property `connector.support-timestamp` because a user chooses that by choosing the connector type. Whether the connector supports writing a system timestamp can be an internal field/annotation/interface of the `TableSink` that is generated from the properties. - Copying the timestamp to the StreamRecord timestamp field can be done with a process function. Actually, we do that already when converting a Table into a DataStream. Setting the flag in the Kafka TableSink should be easy. - Not sure if `from-source` needs to be supported by the initial version. We could just implement `from-field` for now, and handle `from-source` as a follow up issue. Since we are approaching feature freeze, I think this might be a good idea at this point. What do you think? Fabian > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530939#comment-16530939 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @fhueske @twalthr thanks for the comments. In `from-source`, the only system i know of is Kafka10 or Kafka11, which support writing record along with timestamp. To support `from-source` in table sink, I think we can do the following: 1) add a connector property, e.g. connector.support-timestamp. Only if connector.support-timestamp is true, we will allow the sink table schema to contain a field with rowtime type `from-source`. Otherwise, an exception will be thrown. 2) if the condition in 1) is satisfied, we will create corresponding rowtime field in the sink table schema with type LONG, in TableEnvironment.insertInto(), we will validate the sink schema against the insertion source. Also, in the TableSink.emitDataStream() implementation, we will need to insert an timestamp assigner operator to set StreamRecord.timestamp (should we reuse existing interface, or create a new timestampInserter interface?) and remove the extra rowtime field from StreamRecord.value before we emit the datastream to the sink. (for kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true)) Please correct me if I missed something here. What do you think? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527653#comment-16527653 ] ASF GitHub Bot commented on FLINK-8866: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6201 Hi, I think timestamp fields of source-sink tables should be handled as follows when emitting the table: - `proc-time`: ignore - `from-field`: simply write out the timestamp as part of the row. - `from-source`: write the timestamp separately to the system and remove it from the row. This only works if we can set the timestamp to the sink system. If the system sets the ingestion timestamp by it own, i.e., not the actual value, rows would contain different timestamps when they are ingested. If the sink system does not support to set a timestamp, we cannot allow such a table definition. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527379#comment-16527379 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6201 @suez1224 Yes sounds good to me. Only `from-field` timestamps matter right now. We should also think of the opposite of a timestamps extractor (timestamp inserter) for cases where the rowtime needs some preprocessing (like e.g. concatenation of a DATE and TIME column), but we can deal with such cases in a follow up issue. A helper method would be useful. We already have something similar in `SchemaValidator` for schema derivation. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527250#comment-16527250 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r199066851 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import java.sql.Timestamp +import java.util +import java.util.Collections + +import org.apache.flink.api.common.io.RichOutputFormat +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase} +import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp +import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps +import org.apache.flink.table.sources._ +import org.apache.flink.table.util.TableConnectorUtil +import org.apache.flink.types.Row + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +object MemoryTableSourceSinkUtil { + var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]() + + def clear = { +MemoryTableSourceSinkUtil.tableData.clear() + } + + class UnsafeMemoryTableSource(tableSchema: TableSchema, +returnType: TypeInformation[Row], --- End diff -- done > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527249#comment-16527249 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r199066578 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -321,6 +327,18 @@ public void stop(SessionContext session) { } } + private void executeUpdateInternal(ExecutionContext context, String query) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + envInst.getTableEnvironment().sqlUpdate(query); + // create job graph with dependencies + final String jobName = context.getSessionContext().getName() + ": " + query; + final JobGraph jobGraph = envInst.createJobGraph(jobName); + + // create execution + new Thread(new ProgramDeployer<>(context, jobName, jobGraph, null)).start(); --- End diff -- Yes, let me put it into a separate PR. StatusResult make sense to me. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527247#comment-16527247 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @twalthr , for sink only table, I dont think the user need to define any rowtimes on it, since it will never use as a source. For table as both source and sink, when registering it as sink, I think we only need to take care of the 'from-field' columns, since they map to actual data fields in the table. For proctime and 'from-source' columns, we can just ignore them when building the sink schema. Maybe, we should have some helper method for building the schema for source and sink separately. Please correct me if I missed something here. What do you think? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527246#comment-16527246 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r199065915 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -321,6 +327,18 @@ public void stop(SessionContext session) { } } + private void executeUpdateInternal(ExecutionContext context, String query) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + envInst.getTableEnvironment().sqlUpdate(query); + // create job graph with dependencies + final String jobName = context.getSessionContext().getName() + ": " + query; + final JobGraph jobGraph = envInst.createJobGraph(jobName); + + // create execution + new Thread(new ProgramDeployer<>(context, jobName, jobGraph, null)).start(); --- End diff -- @twalthr, for sink only table, I dont think the user need to define any rowtimes on it, since it will never use as a source. For table as both source and sink, when registering it as sink, I think we only need to take care of the 'from-field' columns, since they map to actual data fields in the table. For `proctime` and 'from-source' columns, we can just ignore them when building the sink schema. Maybe, we should have some helper method for building the schema for source and sink separately. Please correct me if I missed something here. What do you think? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525764#comment-16525764 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198678147 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector + +import org.apache.flink.table.api.{NoMatchingTableConnectorException, TableException, ValidationException} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.TableDescriptorValidator +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class TableSinkFactoryServiceTest { + @Test + def testValidProperties(): Unit = { +val props = properties() + assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != null) + } + + @Test(expected = classOf[NoMatchingTableConnectorException]) + def testInvalidContext(): Unit = { +val props = properties() +props.put(CONNECTOR_TYPE, "FAIL") +TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put(CONNECTOR_PROPERTY_VERSION, "2") +// the table source should still be found + assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != null) + } + + @Test(expected = classOf[ValidationException]) + def testUnsupportedProperty(): Unit = { +val props = properties() +props.put("format.path_new", "/new/path") +TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + } + + @Test(expected = classOf[TableException]) + def testFailingFactory(): Unit = { +val props = properties() +props.put("failing", "true") +TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + } + + private def properties(): mutable.Map[String, String] = { +val properties = mutable.Map[String, String]() +properties.put(TableDescriptorValidator.TABLE_TYPE, + TableDescriptorValidator.TABLE_TYPE_VALUE_SINK) +properties.put(CONNECTOR_TYPE, "test") --- End diff -- good point, done > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525761#comment-16525761 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198677922 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { + /** +* Specify the type of the table connector, check +* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for all values. +* +* @return the table connector type,. +*/ + def tableType() : String --- End diff -- sounds good. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525755#comment-16525755 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198677754 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { + /** +* Specify the type of the table connector, check +* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for all values. +* +* @return the table connector type,. --- End diff -- done > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525757#comment-16525757 ] ASF GitHub Bot commented on FLINK-8866: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198677762 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector --- End diff -- Done > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524016#comment-16524016 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198218238 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -321,6 +327,18 @@ public void stop(SessionContext session) { } } + private void executeUpdateInternal(ExecutionContext context, String query) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + envInst.getTableEnvironment().sqlUpdate(query); --- End diff -- Wrap it into a try-catch similar to org.apache.flink.table.client.gateway.local.LocalExecutor#createTable. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523996#comment-16523996 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198135204 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector --- End diff -- Use plural `connectors` > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523997#comment-16523997 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198137127 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { - private lazy val defaultLoader = ServiceLoader.load(classOf[TableSourceFactory[_]]) + private lazy val defaultLoader = ServiceLoader.load(classOf[TableConnectorFactory[_]]) - def findAndCreateTableSource(descriptor: TableSourceDescriptor): TableSource[_] = { -findAndCreateTableSource(descriptor, null) + def findAndCreateTableConnector(descriptor: TableDescriptor): T = { +findAndCreateTableConnector(descriptor, null) } - def findAndCreateTableSource( - descriptor: TableSourceDescriptor, - classLoader: ClassLoader) -: TableSource[_] = { + def findAndCreateTableConnector(descriptor: TableDescriptor, classLoader: ClassLoader) + : T = { val properties = new DescriptorProperties() descriptor.addProperties(properties) -findAndCreateTableSource(properties.asMap.asScala.toMap, classLoader) +findAndCreateTableConnector(properties.asMap.asScala.toMap, classLoader) } - def findAndCreateTableSource(properties: Map[String, String]): TableSource[_] = { -findAndCreateTableSource(properties, null) + def findAndCreateTableConnector(properties: Map[String, String]): T = { +findAndCreateTableConnector(properties, null) } - def findAndCreateTableSource( - properties: Map[String, String], - classLoader: ClassLoader) -: TableSource[_] = { + def findAndCreateTableConnector(properties: Map[String, String], --- End diff -- This can fit into one line. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524000#comment-16524000 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198146766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging { // check for supported properties plainProperties.foreach { k => - if (!supportedProperties.contains(k)) { + if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && !supportedProperties.contains(k)) { throw new ValidationException( s"Table factory '${factory.getClass.getCanonicalName}' does not support the " + - s"property '$k'. Supported properties are: \n" + - s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}") +s"property '$k'. Supported properties are: \n" + + s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}") } } -// create the table source +// create the table connector try { factory.create(properties.asJava) } catch { case t: Throwable => throw new TableException( - s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.", + s"Table connector factory '${factory.getClass.getCanonicalName}' caused an exception.", --- End diff -- There are more exception messages in this class that need an update. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524009#comment-16524009 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198215478 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import java.sql.Timestamp +import java.util +import java.util.Collections + +import org.apache.flink.api.common.io.RichOutputFormat +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase} +import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp +import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps +import org.apache.flink.table.sources._ +import org.apache.flink.table.util.TableConnectorUtil +import org.apache.flink.types.Row + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +object MemoryTableSourceSinkUtil { + var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]() + + def clear = { +MemoryTableSourceSinkUtil.tableData.clear() + } + + class UnsafeMemoryTableSource(tableSchema: TableSchema, +returnType: TypeInformation[Row], --- End diff -- We usually intend differently. Take `org.apache.flink.table.codegen.CodeGenerator` as an example. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523998#comment-16523998 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198136187 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { + /** +* Specify the type of the table connector, check +* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for all values. +* +* @return the table connector type,. +*/ + def tableType() : String --- End diff -- Rename to `getType()`? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524002#comment-16524002 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198148852 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Common class for all descriptors describing a table sink. + */ +abstract class TableSinkDescriptor extends TableDescriptor { + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- Please add a comment to this method as it will be public in a Java API (similar to TableSourceDescriptor). > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524008#comment-16524008 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198151603 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Common class for all descriptors describing a table sink. + */ +abstract class TableSinkDescriptor extends TableDescriptor { + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- Do we need a `TableSinkDescriptor`? Can't we unify `TableSinkDescriptor` and `TableSourceDescriptor` to `TableDescriptor`? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524015#comment-16524015 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198223665 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -321,6 +327,18 @@ public void stop(SessionContext session) { } } + private void executeUpdateInternal(ExecutionContext context, String query) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + envInst.getTableEnvironment().sqlUpdate(query); + // create job graph with dependencies + final String jobName = context.getSessionContext().getName() + ": " + query; + final JobGraph jobGraph = envInst.createJobGraph(jobName); + + // create execution + new Thread(new ProgramDeployer<>(context, jobName, jobGraph, null)).start(); --- End diff -- I think even a detached job needs to return a result. Otherwise you cannot be sure if the job has been submitted or not. E.g., the cluster might not be reachable. In any case, every created thread should be managed by the result store. So we should have a similar architecture as for queries. Maybe instead of `CollectStreamResult` a `StatusResult`. Maybe we should do the SQL Client changes in a separate PR? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524011#comment-16524011 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198215161 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase { "1,1,Hi,1970-01-01 00:00:00.001", "2,2,Hello,1970-01-01 00:00:00.002", "3,2,Hello world,1970-01-01 00:00:00.002") -assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) +assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted) + } + + @Test + def testWriteReadTableSourceSink(): Unit = { +var env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +var tEnv = TableEnvironment.getTableEnvironment(env) +MemoryTableSourceSinkUtil.clear + +val t = StreamTestData.getSmall3TupleDataStream(env) + .assignAscendingTimestamps(x => x._2) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) +tEnv.registerTable("sourceTable", t) + +val fieldNames = Array("a", "e", "f", "t") +val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) + .asInstanceOf[Array[TypeInformation[_]]] + +val tableSchema = new TableSchema( + Array("a", "e", "f", "t", "rowtime", "proctime"), + Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP, +Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)) +val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) +.asInstanceOf[Array[TypeInformation[_]]], + Array("a", "e", "f", "t")) +tEnv.registerTableSource("targetTable", new MemoryTableSourceSinkUtil.UnsafeMemoryTableSource( + tableSchema, returnType, "rowtime", 3)) +tEnv.registerTableSink("targetTable", + new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, fieldTypes)) + +tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable") +tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable") --- End diff -- I think we need more test cases about how we handle the time attributes for `both` table types. Maybe not only ITCases but also unit tests. The `configure` method is an internal method that should not be called here. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524005#comment-16524005 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198137230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { - private lazy val defaultLoader = ServiceLoader.load(classOf[TableSourceFactory[_]]) + private lazy val defaultLoader = ServiceLoader.load(classOf[TableConnectorFactory[_]]) - def findAndCreateTableSource(descriptor: TableSourceDescriptor): TableSource[_] = { -findAndCreateTableSource(descriptor, null) + def findAndCreateTableConnector(descriptor: TableDescriptor): T = { +findAndCreateTableConnector(descriptor, null) } - def findAndCreateTableSource( - descriptor: TableSourceDescriptor, - classLoader: ClassLoader) -: TableSource[_] = { + def findAndCreateTableConnector(descriptor: TableDescriptor, classLoader: ClassLoader) --- End diff -- This can fit into one line. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524013#comment-16524013 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198154676 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala --- @@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] -val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { - case _: StreamTableSourceTable[_] => true - case _: BatchTableSourceTable[_] => false +val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { + case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { +case _: StreamTableSourceTable[_] => true --- End diff -- The pattern are incorrect since the method returns an option. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524012#comment-16524012 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198147548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala --- @@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor( CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) { private var path: Option[String] = None + private var numFiles: Option[Int] = None --- End diff -- Please remove these unrelated changes and open a separate issue for them. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523999#comment-16523999 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198146395 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging { // check for supported properties plainProperties.foreach { k => - if (!supportedProperties.contains(k)) { + if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && !supportedProperties.contains(k)) { --- End diff -- The table type must not be part of the supported properties as it is defined in a separate method in the `TableConnectorFactory` interface. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524003#comment-16524003 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198135694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { + /** +* Specify the type of the table connector, check +* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for all values. +* +* @return the table connector type,. --- End diff -- remove comma > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524007#comment-16524007 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198207327 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector + +import org.apache.flink.table.api.{NoMatchingTableConnectorException, TableException, ValidationException} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.TableDescriptorValidator +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class TableSinkFactoryServiceTest { + @Test + def testValidProperties(): Unit = { +val props = properties() + assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != null) + } + + @Test(expected = classOf[NoMatchingTableConnectorException]) + def testInvalidContext(): Unit = { +val props = properties() +props.put(CONNECTOR_TYPE, "FAIL") +TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put(CONNECTOR_PROPERTY_VERSION, "2") +// the table source should still be found + assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != null) + } + + @Test(expected = classOf[ValidationException]) + def testUnsupportedProperty(): Unit = { +val props = properties() +props.put("format.path_new", "/new/path") +TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + } + + @Test(expected = classOf[TableException]) + def testFailingFactory(): Unit = { +val props = properties() +props.put("failing", "true") +TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + } + + private def properties(): mutable.Map[String, String] = { +val properties = mutable.Map[String, String]() +properties.put(TableDescriptorValidator.TABLE_TYPE, + TableDescriptorValidator.TABLE_TYPE_VALUE_SINK) +properties.put(CONNECTOR_TYPE, "test") --- End diff -- I would use strings here for everything (not the variables). This allows tests to fail if we refactor one of the properties. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524010#comment-16524010 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198160808 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable + +class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]], --- End diff -- Call this `ExternalTable` or `ConnectorTable`? Maybe we can add more helper methods here like `hasSource()`, `getSource` in order to reduce the many pattern matching changes in this PR. I'm also wondering if we need the separate classes `TableSinkTable` and `TableSourceTable` anymore. We could store `TableSink` and `TableSource` directly in `UnifiedTable` > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524001#comment-16524001 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198134945 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** --- End diff -- Add the updated comment again. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524014#comment-16524014 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198219005 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -321,6 +327,18 @@ public void stop(SessionContext session) { } } + private void executeUpdateInternal(ExecutionContext context, String query) { + final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); + + envInst.getTableEnvironment().sqlUpdate(query); --- End diff -- We also need to ship the query config here. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524004#comment-16524004 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198142339 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -98,10 +98,12 @@ object TableSourceFactoryService extends Logging { plainContext.remove(STATISTICS_PROPERTY_VERSION) // check if required context is met -if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) { +if (properties.get(TableDescriptorValidator.TABLE_TYPE).get.equals(factory.tableType()) && --- End diff -- Consider cases where the type has not been set. Btw `properties.get(TableDescriptorValidator.TABLE_TYPE).get` can be simplified to `properties(TableDescriptorValidator.TABLE_TYPE)`. It might be useful to enable more warnings in your IDE. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524006#comment-16524006 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198137289 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { --- End diff -- Make abstract? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523995#comment-16523995 ] ASF GitHub Bot commented on FLINK-8866: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198135589 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- Actually, we could also simplify this and call it `TableFactory`. What do you think? We also call `CREATE TABLE` not `CREATE TABLE CONNECTOR` in SQL. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520057#comment-16520057 ] ASF GitHub Bot commented on FLINK-8866: --- GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/6201 [FLINK-8866][Table API & SQL] Add support for unified table sink instantiation **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Add interfaces to support unified table sink configuration and instantiation. Consolidate table source and table sink configuration and instantiation. ## Brief change log - Consolidate table sink and table source instantiation with TableConnectorFactory{Service}. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for testing registering table source and sink tables under the same name. - *Added integration tests for testing insert into command in sql client. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink FLINK-8866-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6201 commit fb1a021f86668f326f6322b52a324da8e6eb1f47 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z - Add unified table sink instantiation. - Consolidate table sink and table source instantiation. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477127#comment-16477127 ] Fabian Hueske commented on FLINK-8866: -- Regarding (3), originally, {{TableSink}} was designed to be flexible with regard to the output schema. A query that should be emitted was planned by the optimizer. Based on the resulting type, the TableSink was internally configured for the result schema. The configuration method produces a configured copy of the sink that is used to emit the result. So, the TableSink was not known to Calcite and only handled by the TableEnvironment. When we added support for {{INSERT INTO}} this didn't work anymore because Calcite validates that the schema of the target table is compatible with the result schema of the SELECT query. Hence, we added the field names and types to the registration, configure the TableSink, and register the newly configured TableSink in Calcite's catalog. By doing it this way, we did not have to change the interface of the TableSink which did not only mean backwards compatibility but also that all TableSinks can be used in either way. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16475416#comment-16475416 ] Timo Walther commented on FLINK-8866: - I agree with (1), we could definitely reduce the number of copied code. A design doc would be very helpful for discussing (2). The need for field names and types should depend on the connector/format or should serve as a safety barrier before writing. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16475177#comment-16475177 ] Rong Rong commented on FLINK-8866: -- +1 on the (1) and (2) point, I found like huge trunk of component being copied when trying create TableSink[Factory/FactoryService] component with current architecture. Regrading (3), I think there might be some SinkFunction where fieldName and fieldType is necessary to validate during initialization of the Sink function (such as JDBC sink, where the underlying JDBC driver is loaded in runtime I believe). What do you think should we still consider having them as *optional* part of the configuration? > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474801#comment-16474801 ] Shuyi Chen commented on FLINK-8866: --- Hi [~walterddr], [~twalthr], [~fhueske], thanks a lot for the comments. I think there are a few challenges for this JIRA, 1) there can be a lot of duplicate code dealing with the unified table sink instantiation if we do it in the same way as TableSourceFactory/TableSourceFactoryService. So we should try to refactor/redesign it to make it cleaner. 2) to support a table which can be both source and sink, we need to have a unified interface at least when interacting with Calcite, so the same table name can be used for the source and sink in SQL. 3) when registering tableSinks, the current registerTableSink interface took additional parameters _fieldName_ and _fieldTypes_, which I dont think it's necessary and add complexity when integrating with SQL DDL and SQL client. I am experimenting the changes needed in my local branch, and writing a design doc. Would love to share the design doc soon when I think it's ready. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474463#comment-16474463 ] Rong Rong commented on FLINK-8866: -- I think they should be designed to use in both source and sink cases. It doesn't really make sense to separate them, at least not in [~twalthr]'s Kafka example and I don't really think there's any difference for others either. The only thing I can think of is security/authorization concern: E.g. whether sharing configurations for real-only access and write access in the same location is safe or not. [~suez1224] what do you think? I haven't implement the "both" part, this diff is merely trying to clear some questions I had for implementing FLINK-8880. Also, there's some components which I think can definitely be unified in Table API level, such as the Source/Sink Factory service. It can make the APIs more unified. What do you guys think? -- Rong > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474115#comment-16474115 ] Fabian Hueske commented on FLINK-8866: -- I think we should design the system properties in a way that they can be used for both, sources and sinks. For most connectors, source and sink properties should not make a difference. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474096#comment-16474096 ] Timo Walther commented on FLINK-8866: - Hi [~walterddr], the changes for FLINK-8866 look good to me. I wonder whether this design also works for concrete implementations of connectors. E.g., if we use a table type "both", would a Kafka table source and sink require the same properties as a regular "source" or slightly different properties. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472571#comment-16472571 ] Rong Rong commented on FLINK-8866: -- Hi [~suez1224] I was working on FLINK-8880 and I thought it will be great to have some understanding on how this JIRA is going to be implemented. Since some of the functionalities in the description is already implemented in FLINK-9059. I had a preliminary version that only supports "source" and "sink" and was wondering if this could be part of the solution: https://github.com/apache/flink/compare/master...walterddr:FLINK-8866 Please kindly take a look, many thanks Rong > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)