[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544494#comment-16544494 ] ASF GitHub Bot commented on FLINK-8558: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6323 > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544465#comment-16544465 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6323 Thank @pnowojski. Merging this... > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544464#comment-16544464 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202535180 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544291#comment-16544291 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202521212 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544289#comment-16544289 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202521122 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- But this leads to quite some unnecessary code duplication. `checkNotNull(factoryClass)` appears 5 times here and same applies to other params. Doing the check only in the place where you are touching value would either solve or at least limit this issue. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544290#comment-16544290 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202520986 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- I'm just asking because I can not find what code has replaced them. Or there were some dead code? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544288#comment-16544288 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202521065 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- Maybe in that case deduplicate comment with `@See` java doc pointer? Otherwise there is huge risk of comments drifting out of sink. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544279#comment-16544279 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202520938 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- I am not happy but I can not think about something better :( `Factory` is way to generic. It implies that this is THE Factory, THE One Factory To Rule Them All and that's not true. There are plenty of other factories in `flink-table`. `Table` prefix as you confirmed is just a package prefix, thus doesn't improve anything :( > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544072#comment-16544072 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6323 Thank you @pnowojski. I hope I could address all your comments. I will clean the commit history and improve the commit messages during merging. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544040#comment-16544040 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202507057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { -case Some(_: StreamTableSourceTable[_]) => true -case Some(_: BatchTableSourceTable[_]) => false -case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- This information is useful. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544038#comment-16544038 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506906 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544028#comment-16544028 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506861 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544024#comment-16544024 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544023#comment-16544023 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506740 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- The variable is only passed one time. The internal method is not checking for null gain. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544019#comment-16544019 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506661 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[TableFormatFactory]] for the - * given properties. + * Exception for not finding a [[TableFactory]] for the given properties. * * @param message message that indicates the current matching step * @param factoryClass required factory class - * @param formatFactories all found factories - * @param properties properties that describe the table format + * @param factories all found factories + * @param properties properties that describe the configuration * @param cause the cause */ -case class NoMatchingTableFormatException( +case class NoMatchingTableFactoryException( message: String, factoryClass: Class[_], - formatFactories: Seq[TableFormatFactory[_]], + factories: Seq[TableFactory], properties: Map[String, String], cause: Throwable) extends RuntimeException( --- End diff -- So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544018#comment-16544018 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- Because the Java comment explains the specific situation how supported format properties are handled. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544017#comment-16544017 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap -val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => -source match { - case bts: BatchTableSource[_] => -new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( -s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") -} +val source = TableFactoryService --- End diff -- Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544016#comment-16544016 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506512 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java --- @@ -21,8 +21,12 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** - * Tests for {@link Kafka08JsonTableSourceFactory}. + * Tests for legacy Kafka08JsonTableSourceFactory. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sources. */ +@Deprecated --- End diff -- No but we forgot it in the previous commit. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544015#comment-16544015 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506494 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- The table type is a concept of the SQL Client and should not be part of the table descriptor. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544007#comment-16544007 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544004#comment-16544004 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506126 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- Backward compatibility. It could have been null in the past. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543256#comment-16543256 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202269785 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- How can this field ever be null? `SchemaValidator.deriveFieldMapping` doesn't allow for that. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543243#comment-16543243 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202332281 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala --- @@ -16,42 +16,17 @@ * limitations under the License. */ -package org.apache.flink.table.formats +package org.apache.flink.table.factories import java.util -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} - /** - * A factory to create different table format instances. This factory is used with Java's Service - * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized - * properties that describe the desired format. The factory allows for matching to the given set of - * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for - * creating configured instances of format classes accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in - * the current classpath to be found. + * A factory to create configured table format instances based on string-based properties. See + * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes */ -trait TableFormatFactory[T] { - - /** -* Specifies the context that this factory has been implemented for. The framework guarantees -* to only use the factory if the specified set of properties and values are met. -* -* Typical properties might be: -* - format.type -* - format.version -* -* Specified property versions allow the framework to provide backwards compatible properties -* in case of string format changes: -* - format.property-version -* -* An empty context means that the factory matches for all requests. -*/ - def requiredContext(): util.Map[String, String] +trait TableFormatFactory[T] extends TableFactory { --- End diff -- why haven't you removed `supportedProperties` as well? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543251#comment-16543251 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202337455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543260#comment-16543260 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202348312 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java --- @@ -63,65 +62,45 @@ public void run() { LOG.debug("Submitting job {} with the following environment: \n{}", jobGraph.getJobID(), context.getMergedEnvironment()); } - if (result != null) { - executionResultBucket.add(deployJob(context, jobGraph, result)); - } else { - deployJob(context, jobGraph, result); - } + executionResultBucket.add(deployJob(context, jobGraph, result)); } public JobExecutionResult fetchExecutionResult() { - if (result != null) { - return executionResultBucket.poll(); - } else { - return null; - } + return executionResultBucket.poll(); } /** -* Deploys a job. Depending on the deployment creates a new job cluster. If result is requested, -* it saves the cluster id in the result and blocks until job completion. +* Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in --- End diff -- How do those changes in this file relate to rest of the PR/commit? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543241#comment-16543241 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202327344 --- Diff: flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory --- @@ -13,5 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +<<< HEAD:flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory --- End diff -- ? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543258#comment-16543258 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202354777 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala --- @@ -21,23 +21,47 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.Statistic import org.apache.calcite.schema.impl.AbstractTable +import org.apache.flink.table.api.TableException -class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]], - val tableSinkTableOpt: Option[TableSinkTable[T2]]) +/** --- End diff -- ditto: fixup Shuyi's commit > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543249#comment-16543249 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202335703 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543230#comment-16543230 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202274951 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.connectors +package org.apache.flink.table.factories import java.util /** * Common trait for all properties-based discoverable table factories. */ -trait DiscoverableTableFactory { +trait TableFactory { --- End diff -- name `TableFactory` is kind of strange. `Table` here is more like package name and `Factory` is generic. I think I liked the previous name better, although it wasn't also very good but it's hard for me to figure out a better one. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543253#comment-16543253 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202352869 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { -case Some(_: StreamTableSourceTable[_]) => true -case Some(_: BatchTableSourceTable[_]) => false -case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- drop `//null` ? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543231#comment-16543231 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202269870 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -134,34 +190,60 @@ public String getProctimeAttribute() { return rowtimeAttributeDescriptors; } + @Override + public Map getFieldMapping() { + return fieldMapping; + } + @Override public String explainSource() { return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getColumnNames()); } + /** +* Returns the properties for the Kafka consumer. +* +* @return properties for the Kafka consumer. +*/ + public Properties getProperties() { + return properties; + } + + /** +* Returns the deserialization schema. +* +* @return The deserialization schema +*/ + public DeserializationSchema getDeserializationSchema(){ + return deserializationSchema; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof KafkaTableSource)) { + // TODO force classes to be equal once we drop support for format-specific table sources + // if (o == null || getClass() != o.getClass()) { + if (o == null || !(o instanceof KafkaTableSource)) { return false; } - KafkaTableSource that = (KafkaTableSource) o; + final KafkaTableSource that = (KafkaTableSource) o; return Objects.equals(schema, that.schema) && - Objects.equals(topic, that.topic) && - Objects.equals(properties, that.properties) && - Objects.equals(returnType, that.returnType) && Objects.equals(proctimeAttribute, that.proctimeAttribute) && Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) && + Objects.equals(fieldMapping, that.fieldMapping) && + Objects.equals(topic, that.topic) && + Objects.equals(properties, that.properties) && + Objects.equals(deserializationSchema, that.deserializationSchema) && startupMode == that.startupMode && Objects.equals(specificStartupOffsets, that.specificStartupOffsets); } @Override public int hashCode() { - return Objects.hash(schema, topic, properties, returnType, - proctimeAttribute, rowtimeAttributeDescriptors, startupMode, specificStartupOffsets); + return Objects.hash(schema, proctimeAttribute, rowtimeAttributeDescriptors, fieldMapping, --- End diff -- format one entry per line > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543245#comment-16543245 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202334605 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543254#comment-16543254 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202351647 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -465,14 +465,14 @@ abstract class TableEnvironment(val config: TableConfig) { tableSink: TableSink[_]): Unit /** -* Registers an external [[TableSink]] which is already configured in this -* [[TableEnvironment]]'s catalog. +* Registers an external [[TableSink]] with already configured field names and field types in --- End diff -- ditto? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543234#comment-16543234 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202282221 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -214,15 +216,18 @@ private static TableDescriptor createTableDescriptor(String name, Map properties = new HashMap<>(config); config.remove(TABLE_TYPE); - final Map normalizedConfig = ConfigUtil.normalizeYaml(config); - if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - return new Source(name, normalizedConfig); - } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { - return new Sink(name, normalizedConfig); - } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { - return new SourceSink(name, normalizedConfig); + + final Map normalizedProperties = ConfigUtil.normalizeYaml(properties); + switch (type) { + case TABLE_TYPE_VALUE_SOURCE: + return new Source(name, normalizedProperties); + case TABLE_TYPE_VALUE_SINK: + return new Sink(name, normalizedProperties); + case TABLE_TYPE_VALUE_BOTH: + return new SourceSink(name, normalizedProperties); --- End diff -- ``` default: throw new UnsupportedOperationException( String.format("Unsupported [%s] value [%s]", TABLE_TYPE, type) ``` > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543233#comment-16543233 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202275530 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala --- @@ -21,7 +21,14 @@ package org.apache.flink.table.factories import java.util /** - * Common trait for all properties-based discoverable table factories. + * A factory to create different table-related instances from string-based properties. This + * factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is + * called with a set of normalized properties that describe the desired format. The factory allows --- End diff -- isn't this comment a bit misleading? > that describe the desired format. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543228#comment-16543228 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202268969 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala --- @@ -50,7 +50,8 @@ trait DefinedFieldMapping { * type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make * fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]]. * -* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields. +* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields or +* null if no mapping is necessary. */ def getFieldMapping: JMap[String, String] --- End diff -- annotate `Nullable` or change to `Optional`. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543242#comment-16543242 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202282838 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java --- @@ -21,8 +21,12 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** - * Tests for {@link Kafka08JsonTableSourceFactory}. + * Tests for legacy Kafka08JsonTableSourceFactory. + * + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we + * drop support for format-specific table sources. */ +@Deprecated --- End diff -- Does this modification belong to this commit? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543229#comment-16543229 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202268019 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no --- End diff -- > null `Optional` or overload or use builder or disallow null. Same applies to classes that are extending from this one. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543259#comment-16543259 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202356170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala --- @@ -104,17 +100,12 @@ class CsvTableSinkFactory "Encodings that differ from the schema are not supported yet for CsvTableSink.") } -toScala(params.getOptionalString(CONNECTOR_PATH)) - .foreach(csvTableSinkBuilder.path) -toScala(params.getOptionalInt(NUM_FILES)) - .foreach(n => csvTableSinkBuilder.numFiles(n)) -toScala(params.getOptionalString(WRITE_MODE)) - .foreach(csvTableSinkBuilder.writeMode) -toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)) - .foreach(csvTableSinkBuilder.fieldDelimiter) - -csvTableSinkBuilder - .build() +val path = params.getString(CONNECTOR_PATH) --- End diff -- ditto fixup > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543236#comment-16543236 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202336263 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543227#comment-16543227 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202268054 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -54,51 +56,105 @@ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { +public abstract class KafkaTableSource implements + StreamTableSource, + DefinedProctimeAttribute, + DefinedRowtimeAttributes, + DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; + + // Kafka-specific attributes + /** The Kafka topic to consume. */ private final String topic; /** Properties for the Kafka consumer. */ private final Properties properties; - /** Type information describing the result type. */ - private TypeInformation returnType; - - /** Field name of the processing time attribute, null if no processing time field is defined. */ - private String proctimeAttribute; - - /** Descriptor for a rowtime attribute. */ - private List rowtimeAttributeDescriptors; + /** Deserialization schema for decoding records from Kafka. */ + private final DeserializationSchema deserializationSchema; /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - private StartupMode startupMode = StartupMode.GROUP_OFFSETS; + private StartupMode startupMode; /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ private Map specificStartupOffsets; /** * Creates a generic Kafka {@link StreamTableSource}. * -* @param topic Kafka topic to consume. -* @param propertiesProperties for the Kafka consumer. -* @param schemaSchema of the produced table. -* @param returnTypeType information of the produced physical DataStream. +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no +*processing time field is defined. +* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute +* @param fieldMappingMapping for the fields of the table schema to --- End diff -- >or null ditto > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543237#comment-16543237 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202282478 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java --- @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); --- End diff -- why did those lines disappear? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543239#comment-16543239 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202271871 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java --- @@ -124,110 +123,127 @@ params.putProperties(properties); // validate - new SchemaValidator(true).validate(params); + // allow Kafka timestamps to be used, watermarks can not be received from source + new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params); new KafkaValidator().validate(params); - formatValidator().validate(params); - // build - final KafkaTableSource.Builder builder = createBuilderWithFormat(params); + // deserialization schema using format discovery + final DeserializationSchemaFactory formatFactory = TableFormatFactoryService.find( + DeserializationSchemaFactory.class, + properties, + this.getClass().getClassLoader()); + @SuppressWarnings("unchecked") + final DeserializationSchema deserializationSchema = (DeserializationSchema) formatFactory + .createDeserializationSchema(properties); - // topic - final String topic = params.getString(CONNECTOR_TOPIC); - builder.forTopic(topic); + // schema + final TableSchema schema = params.getTableSchema(SCHEMA()); + + // proctime + final String proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params).orElse(null); --- End diff -- this is kind of ridiculous :/ Deep inside we work on `Optional` and then we switch to null... Please drag this `Optional` until the very end and do this `orElse(null)` conversion in `org.apache.flink.table.sources.DefinedProctimeAttribute#getProctimeAttribute` implementation. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543248#comment-16543248 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202334688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543247#comment-16543247 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202334214 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) --- End diff -- if you are not touching variables but only passing them over somewhere, you can safely skip `checkNotNull`. Handy if some variable is being passed multiple times (like here) > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543257#comment-16543257 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202356462 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala --- @@ -186,7 +186,7 @@ class TableEnvironmentITCase( def testInsertIntoMemoryTable(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) -MemoryTableSourceSinkUtil.clear +MemoryTableSourceSinkUtil.clear() --- End diff -- ditto, and same applies probably to hundreds of other changes in this commit :( > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543252#comment-16543252 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202350406 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -114,6 +114,8 @@ abstract class StreamTableEnvironment( : Unit = { tableSource match { + + // check for proper stream table source --- End diff -- ditto? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543240#comment-16543240 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202289678 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap -val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => -source match { - case bts: BatchTableSource[_] => -new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( -s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") -} +val source = TableFactoryService --- End diff -- 1. This case looks kind of ugly. Shouldn't it be unify to sth like: ``` TableFactoryService.find(classOf[TableSourceFactory], javaMap) .createTableSource(javaMap, environment.isInstanceOf[StreamTableEnvironment]) ``` ? If you really want, you can on top of that define methods: ``` def createBatchTableSource(val javaMap) = createTableSource(javaMap, isStream = false) def createStreamTableSource(val javaMap) = createTableSource(javaMap, isStream = true) ``` but I would skip them. Factories could choose to support or not streaming/batching tables. Same applies to similar code in `ExecutionContext.java`. 2. Ditto: Is there any significant value of keeping both `BatchTableSink(Source)Factory` and `StreamTableSink(Source)Factory`? both 1. and 2 adds quite a lot of boilerplate code for definition and for caller. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543250#comment-16543250 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202338408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543255#comment-16543255 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202347281 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -329,18 +329,6 @@ public void stop(SessionContext session) { } } - private void executeUpdateInternal(ExecutionContext context, String query) { --- End diff -- Please squash this change with commit that introduced this unused method > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543246#comment-16543246 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202335205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543244#comment-16543244 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202333467 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[TableFormatFactory]] for the - * given properties. + * Exception for not finding a [[TableFactory]] for the given properties. * * @param message message that indicates the current matching step * @param factoryClass required factory class - * @param formatFactories all found factories - * @param properties properties that describe the table format + * @param factories all found factories + * @param properties properties that describe the configuration * @param cause the cause */ -case class NoMatchingTableFormatException( +case class NoMatchingTableFactoryException( message: String, factoryClass: Class[_], - formatFactories: Seq[TableFormatFactory[_]], + factories: Seq[TableFactory], properties: Map[String, String], cause: Throwable) extends RuntimeException( --- End diff -- side note: shouldn't our exception inherit from `FlinkTableException` or sth like that? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543238#comment-16543238 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202350170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -103,24 +103,40 @@ abstract class BatchTableEnvironment( : Unit = { tableSource match { + + // check for proper batch table source --- End diff -- ditto, those changes do seems to not belong to this commit. If so please extract them to separate one: ``` git reset --soft HEAD^ git reset HEAD flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala git commit -m "Clean up and simplify changes" git commit -a --fixup=42a8a156d4e6f8f3d119c458350b6c897306fc48 # MD5 of Shuyi Chen's commit git rebase -i origin/master --autosquash ``` > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543235#comment-16543235 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202282625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala --- @@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors * Common class for all descriptors describing a table sink. */ abstract class TableSinkDescriptor extends TableDescriptor { + + /** +* Internal method for properties conversion. +*/ override private[flink] def addProperties(properties: DescriptorProperties): Unit = { super.addProperties(properties) -properties.putString(TableDescriptorValidator.TABLE_TYPE, - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE) --- End diff -- why did those lines disappear? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543232#comment-16543232 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202281623 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -214,15 +216,18 @@ private static TableDescriptor createTableDescriptor(String name, Map properties = new HashMap<>(config); --- End diff -- copy of `config` named as `properties`? this is confusing. Rename `config` to `properties` and inline this copy? Or inline `properties` and rename `normalizedProperties` back to `normalizedConfig` > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542614#comment-16542614 ] ASF GitHub Bot commented on FLINK-8558: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/6323 [FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format interfaces ## What is the purpose of the change This PR finalizes the efforts done in #6264 and #6201 for having unified interfaces for table sources, table sinks, and table formats. It reduces code duplication and cleans up the code base around factories. ## Brief change log - Introduction of `org.apache.table.factories.TableFactory` a common interface for factories - Introduction of `org.apache.table.factories.TableFormatFactory` a specific table factory for formats - Specific factories for `StreamTableSource`, `StreamTableSink`, `BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and `SerializationSchema` - Deprecation of old format-specific table sources (sinks will be deprecated in a follow-up PR) - Possibility to register table source and sink under a common name (table type `both` in SQL Client YAML) ## Verifying this change - Existing tests verify the implementation - Additional ITCases and unit tests have been added - (An end-to-end test will follow in a separate PR) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6323 commit 980499f887d72ddf9a405c4ad200d0cab15d889c Author: Timo Walther Date: 2018-06-27T11:16:49Z [FLINK-8558] [table] Add unified format interfaces and separate formats from connectors This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility. This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives. commit 42a8a156d4e6f8f3d119c458350b6c897306fc48 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks This closes #6201. commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9 Author: Timo Walther Date: 2018-07-11T11:29:03Z Rename to TableFactory and move it to factories package commit 1c581cba61ba321bb6de6a4d298a881840d11cfe Author: Timo Walther Date: 2018-07-11T11:46:31Z Refactor format factories commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295 Author: Timo Walther Date: 2018-07-12T06:35:00Z Unify table factories commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916 Author: Timo Walther Date: 2018-07-12T07:05:50Z Move table type out of descriptors commit 6b83f2e1c0e63147f049dc5389c5633077b789a4 Author: Timo Walther Date: 2018-07-12T08:50:09Z Make source/sink factories environment-dependent commit 4f1255fd003080f078afe6ef67ffa58f40ffec36 Author: Timo Walther Date: 2018-07-12T18:48:45Z Clean up and simplify changes > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major >
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539791#comment-16539791 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6264 Thanks for the in-depth review @pnowojski. I hope I could address most of your comments. Since this PR heavily overlaps with #6201 and that PR needs a review and some additional work as well. I will close this PR for now and open a PR with a clean unified table sources/sinks/formats story. We can continue the discussions here and I will make sure that changes will be applied to new PR as well. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539792#comment-16539792 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/6264 > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539744#comment-16539744 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201611503 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util + +import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} + +/** + * A factory to create different table format instances. This factory is used with Java's Service + * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized + * properties that describe the desired format. The factory allows for matching to the given set of + * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for + * creating configured instances of format classes accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in + * the current classpath to be found. + * + * @tparam T record type that the format produces or consumes + */ +trait TableFormatFactory[T] { + + /** +* Specifies the context that this factory has been implemented for. The framework guarantees +* to only use the factory if the specified set of properties and values are met. +* +* Typical properties might be: +* - format.type +* - format.version +* +* Specified property versions allow the framework to provide backwards compatible properties +* in case of string format changes: +* - format.property-version +* +* An empty context means that the factory matches for all requests. +*/ + def requiredContext(): util.Map[String, String] --- End diff -- Context is more accurate as the map also contains properties that are not required. E.g. a user does not have to specify `property-version` in YAML but he can. The required context explains the context for which this factory was implemented for. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539672#comment-16539672 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201592609 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util + +import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} + +/** + * A factory to create different table format instances. This factory is used with Java's Service + * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized + * properties that describe the desired format. The factory allows for matching to the given set of + * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for + * creating configured instances of format classes accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in + * the current classpath to be found. + * + * @tparam T record type that the format produces or consumes + */ +trait TableFormatFactory[T] { + + /** +* Specifies the context that this factory has been implemented for. The framework guarantees +* to only use the factory if the specified set of properties and values are met. +* +* Typical properties might be: +* - format.type +* - format.version +* +* Specified property versions allow the framework to provide backwards compatible properties +* in case of string format changes: +* - format.property-version +* +* An empty context means that the factory matches for all requests. +*/ + def requiredContext(): util.Map[String, String] --- End diff -- I still do not understand this name. Could you think about something more descriptive? It seems to me like this method returns the set of properties that are required to match given factory. Thus `requiredProperties` seems better, but maybe I'm missing something? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539673#comment-16539673 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201593060 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { + + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]]) + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +find(factoryClass, descriptor, null) + } + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor and classloader. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T]( + factoryClass: Class[T], + descriptor: Descriptor, + classLoader: ClassLoader) +: T = { + +val properties = new DescriptorProperties() +descriptor.addProperties(properties) +find(factoryClass, properties.asMap, classLoader) + } + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given property map. +* +* @param factoryClass desired format factory +* @param propertyMap properties that describes the format +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +find(factoryClass, propertyMap, null) + } + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given property map and classloader. +* +* @param factoryClass desired format factory +* @param propertyMap properties that describes the format +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { + +val properties = propertyMap.asScala.toMap + +// find matching context +val (foundFactories, contextFactories) = findMatchingContext(properties, classLoader) +if (contextFactories.isEmpty) { + throw new NoMatchingTableFormatException( +"No context matches.", +factoryClass, +
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538683#comment-16538683 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201364539 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestTableFormatFactory.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.utils + +import java.util + +import org.apache.flink.api.common.serialization.DeserializationSchema +import org.apache.flink.table.descriptors.{DescriptorProperties, FormatDescriptorValidator, SchemaValidator} +import org.apache.flink.table.formats.DeserializationSchemaFactory +import org.apache.flink.types.Row + +/** + * Table format factory for testing. + */ +class TestTableFormatFactory extends DeserializationSchemaFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { +val context = new util.HashMap[String, String]() +context.put("format.type", "test-format") --- End diff -- Generally, you are right. The problem of having a static variable containing a property key is however that you can break backwards compatibility and all tests automatically succeed because they all referenced the common variable. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538680#comment-16538680 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201363632 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactory.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util + +import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} + +/** + * A factory to create different table format instances. This factory is used with Java's Service + * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized + * properties that describe the desired format. The factory allows for matching to the given set of + * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for + * creating configured instances of format classes accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in + * the current classpath to be found. + * + * @tparam T record type that the format produces or consumes + */ +trait TableFormatFactory[T] { + + /** +* Specifies the context that this factory has been implemented for. The framework guarantees +* to only use the factory if the specified set of properties and values are met. +* +* Typical properties might be: +* - format.type +* - format.version +* +* Specified property versions allow the framework to provide backwards compatible properties +* in case of string format changes: +* - format.property-version +* +* An empty context means that the factory matches for all requests. +*/ + def requiredContext(): util.Map[String, String] --- End diff -- Context defines the "context" in which the factory will be activated. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538679#comment-16538679 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201363167 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousClassBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +assertTrue( + TableFormatFactoryService +.find(classOf[TestAmbiguousTableFormatFactory], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousSchemaBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("schema.weird_field", "unknown") // this is unknown to the schema derivation factory +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testMissingClass(): Unit = { +val props = properties() +// this class is not a valid factory +TableFormatFactoryService.find(classOf[TableFormatFactoryServiceTest], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testInvalidContext(): Unit = { +val props = properties() +props.put("format.type", "FAIL") // no context specifies this +TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testUnsupportedProperty(): Unit = { +val props = properties() +props.put("format.path_new", "/new/path") // no factory has this +TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props) + } + + @Test(expected = classOf[AmbiguousTableFormatException]) + def testAmbiguousFactory(): Unit = { --- End diff -- I added a comment to the other test. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature >
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538670#comment-16538670 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201360640 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { + + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]]) + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor. +* +* @param factoryClass desired format factory --- End diff -- As mentioned in the description of this PR, I wanted to make this whole format discovery independent from particular interfaces. Maybe we will introduce block-based writer/reader interfaces or async IO interfaces. This should not require changes in the factory service anymore. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538667#comment-16538667 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201359834 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala --- @@ -16,22 +16,32 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka; +package org.apache.flink.table.formats.utils -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010; +import java.util + +import org.apache.flink.table.formats.TableFormatFactory +import org.apache.flink.types.Row /** - * Tests for {@link Kafka010AvroTableSourceFactory}. - */ -public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase { + * Table format factory for testing. + */ +class TestAmbiguousTableFormatFactory extends TableFormatFactory[Row] { --- End diff -- This implicit interconnection is on purposes. A similar configuration could happen in a SQL Client library folder. The `TableFormatFactoryServiceTest` should fail if they are changed too much. But I added more comments for implementers. If `TestAmbiguousTableFormatFactory` would extend from `TestTableFormatFactory` then the class based selection would not work anymore. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538634#comment-16538634 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201353196 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") --- End diff -- Sorry, I should have added more comments to these tests. Will do this now... > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538622#comment-16538622 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201351507 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") --- End diff -- For now we support any property version. The property version should not affect the discovery at the moment. I added a comment to the test. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538613#comment-16538613 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201350063 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory --- End diff -- Yes, we could split the logic here. Would it be ok for you do do this in a follow-up issue? I guess we can combine the logic with `TableSourceFactory` and the upcoming sink discovery. We could introduce a general `FactoryDiscoverer` for all interfaces. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538604#comment-16538604 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201345819 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { --- End diff -- This class is reworked for unified sinks anyway. I will make sure to clean up the duplicate code. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538601#comment-16538601 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201344640 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousClassBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +assertTrue( + TableFormatFactoryService +.find(classOf[TestAmbiguousTableFormatFactory], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousSchemaBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("schema.weird_field", "unknown") // this is unknown to the schema derivation factory --- End diff -- The format with schema derivation feels not responsible because of this field. But since there is another format that feels responsible no exception is thrown. If `TestAmbiguousTableFormatFactory` would not exist an exception would have been thrown. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538578#comment-16538578 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201338537 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( --- End diff -- No it should never return null. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538435#comment-16538435 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201309003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala --- @@ -50,7 +50,8 @@ trait DefinedFieldMapping { * type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make * fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]]. * -* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields. +* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields or --- End diff -- This is public API that we cannot change easily. I think the reason why Fabian chose null here is also for interoperability between Scala and Java. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538430#comment-16538430 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201308428 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { + + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]]) + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +find(factoryClass, descriptor, null) + } + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor and classloader. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T]( + factoryClass: Class[T], + descriptor: Descriptor, + classLoader: ClassLoader) +: T = { --- End diff -- I follow this styling guide: `https://github.com/databricks/scala-style-guide#indent`. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538426#comment-16538426 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201307506 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -110,16 +111,44 @@ abstract class BatchTableEnvironment( } } -// TODO expose this once we have enough table source factories that can deal with it -// /** -//* Creates a table from a descriptor that describes the source connector, source encoding, -//* the resulting table schema, and other properties. -//* -//* @param connectorDescriptor connector descriptor describing the source of the table -//*/ -// def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = { -//new BatchTableSourceDescriptor(this, connectorDescriptor) -// } + /** +* Creates a table from a descriptor that describes the source connector, the source format, +* the resulting table schema, and other properties. +* +* Descriptors allow for declaring communication to external systems in an +* implementation-agnostic way. The classpath is scanned for connectors and matching connectors +* are configured accordingly. +* +* The following example shows how to read from a Kafka connector using a JSON format and +* creating a table: +* +* {{{ +* +* tableEnv +* .from( +* new Kafka() +* .version("0.11") +* .topic("clicks") +* .property("zookeeper.connect", "localhost") +* .property("group.id", "click-group") +* .startFromEarliest()) +* .withFormat( +* new Json() +* .jsonSchema("{...}") +* .failOnMissingField(false)) +* .withSchema( +* new Schema() +* .field("user-name", "VARCHAR").from("u_name") +* .field("count", "DECIMAL") +* .field("proc-time", "TIMESTAMP").proctime()) +* .toTable() +* }}} +* +* @param connectorDescriptor connector descriptor describing the source of the table +*/ + def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = { --- End diff -- Once we reimplement the environments in Java and introduce proper interfaces for hiding the implementation. This problem will be gone anyway. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537014#comment-16537014 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201003278 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( --- End diff -- can `TableFormatFactoryService.find` return null? If so, please change it to `Option` or `Optional` (or whatever scala equivalent is :() > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537001#comment-16537001 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200990785 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowFormatFactory.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.descriptors.AvroValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FormatDescriptorValidator; +import org.apache.flink.table.formats.DeserializationSchemaFactory; +import org.apache.flink.table.formats.SerializationSchemaFactory; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Table format factory for providing configured instances of Avro-to-row {@link SerializationSchema} + * and {@link DeserializationSchema}. + */ +public class AvroRowFormatFactory implements SerializationSchemaFactory, DeserializationSchemaFactory { + + @Override + public Map requiredContext() { + final Map context = new HashMap<>(); + context.put(FormatDescriptorValidator.FORMAT_TYPE(), AvroValidator.FORMAT_TYPE_VALUE); + context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1"); + return context; + } + + @Override + public boolean supportsSchemaDerivation() { + return false; + } + + @Override + public List supportedProperties() { + final List properties = new ArrayList<>(); + properties.add(AvroValidator.FORMAT_RECORD_CLASS); + properties.add(AvroValidator.FORMAT_AVRO_SCHEMA); + return properties; + } + + @Override + public DeserializationSchema createDeserializationSchema(Map properties) { + final DescriptorProperties props = new DescriptorProperties(true); --- End diff -- I meant to extract following lines: ``` + final DescriptorProperties props = new DescriptorProperties(true); + props.putProperties(properties); + + // validate + new AvroValidator().validate(props); ``` to a separate private method and use it both in `createDeserializationSchema` and `createSerializationSchema` > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537005#comment-16537005 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200997560 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { + + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]]) + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +find(factoryClass, descriptor, null) + } + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor and classloader. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T]( + factoryClass: Class[T], + descriptor: Descriptor, + classLoader: ClassLoader) +: T = { --- End diff -- is this how we usually indent start of the function? looks a little bit weird :) > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537029#comment-16537029 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201030190 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableFactory.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} +import org.apache.flink.types.Row + +/** + * Table source factory for testing with a wildcard format ("format.*"). + */ +class TestWildcardFormatTableFactory extends TableSourceFactory[Row] { --- End diff -- Where is this class used? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537028#comment-16537028 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201025967 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { + + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]]) + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor. +* +* @param factoryClass desired format factory --- End diff -- why do we need this param? Shouldn't this be handled by either to instances of this service for serialisation and deserialisation or by copying the providing two sets of methods like `~findSerialisationFactory` and `~findDeserialisationFactory`? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537003#comment-16537003 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200993332 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -110,16 +111,44 @@ abstract class BatchTableEnvironment( } } -// TODO expose this once we have enough table source factories that can deal with it -// /** -//* Creates a table from a descriptor that describes the source connector, source encoding, -//* the resulting table schema, and other properties. -//* -//* @param connectorDescriptor connector descriptor describing the source of the table -//*/ -// def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = { -//new BatchTableSourceDescriptor(this, connectorDescriptor) -// } + /** +* Creates a table from a descriptor that describes the source connector, the source format, +* the resulting table schema, and other properties. +* +* Descriptors allow for declaring communication to external systems in an +* implementation-agnostic way. The classpath is scanned for connectors and matching connectors +* are configured accordingly. +* +* The following example shows how to read from a Kafka connector using a JSON format and +* creating a table: +* +* {{{ +* +* tableEnv +* .from( +* new Kafka() +* .version("0.11") +* .topic("clicks") +* .property("zookeeper.connect", "localhost") +* .property("group.id", "click-group") +* .startFromEarliest()) +* .withFormat( +* new Json() +* .jsonSchema("{...}") +* .failOnMissingField(false)) +* .withSchema( +* new Schema() +* .field("user-name", "VARCHAR").from("u_name") +* .field("count", "DECIMAL") +* .field("proc-time", "TIMESTAMP").proctime()) +* .toTable() +* }}} +* +* @param connectorDescriptor connector descriptor describing the source of the table +*/ + def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = { --- End diff -- I'm not sure. Maintaining duplicated comment in three places, especially one that's lie this one that non trivial, is a futile task. They will drift out of sync. For me the only question is If we can avoid it in some acceptable way. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537026#comment-16537026 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201027539 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousClassBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +assertTrue( + TableFormatFactoryService +.find(classOf[TestAmbiguousTableFormatFactory], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousSchemaBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("schema.weird_field", "unknown") // this is unknown to the schema derivation factory +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testMissingClass(): Unit = { +val props = properties() +// this class is not a valid factory +TableFormatFactoryService.find(classOf[TableFormatFactoryServiceTest], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testInvalidContext(): Unit = { +val props = properties() +props.put("format.type", "FAIL") // no context specifies this +TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testUnsupportedProperty(): Unit = { +val props = properties() +props.put("format.path_new", "/new/path") // no factory has this +TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props) + } + + @Test(expected = classOf[AmbiguousTableFormatException]) + def testAmbiguousFactory(): Unit = { --- End diff -- how does this differ from `testAmbiguousSchemaBasedSelection`? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink >
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537027#comment-16537027 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201020483 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/utils/TestAmbiguousTableFormatFactory.scala --- @@ -16,22 +16,32 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka; +package org.apache.flink.table.formats.utils -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010; +import java.util + +import org.apache.flink.table.formats.TableFormatFactory +import org.apache.flink.types.Row /** - * Tests for {@link Kafka010AvroTableSourceFactory}. - */ -public class Kafka010AvroTableSourceFactoryTest extends KafkaAvroTableSourceFactoryTestBase { + * Table format factory for testing. + */ +class TestAmbiguousTableFormatFactory extends TableFormatFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { +val context = new util.HashMap[String, String]() +context.put("format.type", "test-format") +context.put("format.property-version", "1") +context + } - @Override - protected String version() { - return CONNECTOR_VERSION_VALUE_010; - } + override def supportsSchemaDerivation(): Boolean = false // no schema derivation - @Override - protected KafkaAvroTableSource.Builder builder() { - return Kafka010AvroTableSource.builder(); - } + override def supportedProperties(): util.List[String] = { +val properties = new util.ArrayList[String]() +// has no 'format.important' --- End diff -- move this comment to javadoc of the class > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537012#comment-16537012 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200999721 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.utils.TableTestBase +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ + +class TableSourceDescriptorTest extends TableTestBase { + + @Test + def testStreamTableSourceDescriptor(): Unit = { +val util = streamTestUtil() +val desc = util.tableEnv + .from( +FileSystem() + .path("/path/to/csv")) + .withFormat( +Csv() + .field("myfield", Types.STRING) + .field("myfield2", Types.INT) + .quoteCharacter(';') + .fieldDelimiter("#") + .lineDelimiter("\r\n") + .commentPrefix("%%") + .ignoreFirstLine() + .ignoreParseErrors()) +.withSchema( + Schema() +.field("myfield", Types.STRING) +.field("myfield2", Types.INT) +.field("proctime", Types.SQL_TIMESTAMP).proctime() +) +val expected = Seq( --- End diff -- Ditto. `expectedProperties`? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536999#comment-16536999 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200987026 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java --- @@ -18,48 +18,99 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; +import java.util.List; +import java.util.Map; import java.util.Properties; /** * Kafka {@link StreamTableSource} for Kafka 0.10. */ -@PublicEvolving -public abstract class Kafka010TableSource extends KafkaTableSource { +@Internal +public class Kafka010TableSource extends KafkaTableSource { - // The deserialization schema for the Kafka records - private final DeserializationSchema deserializationSchema; + /** +* Creates a Kafka 0.10 {@link StreamTableSource}. +* +* @param schema Schema of the produced table. +* @param proctimeAttribute Field name of the processing time attribute, null if no --- End diff -- This is kind of controversial topic. Generally speaking I suspect that Java discourage to use `Optional` beside return values because we should use `@Nullable` or not use any of them. However in projects that ignored `@Nullable` annotation (such as Flink), it's virtually impossible to start using them and thus using `Optional` is the only way to have a compiler control over optional/nullable fields. In this particular use case of "optional" arguments my preference hierarchy is: 1. provide a builder for this class 2. provide alternative constructor without this argument 3. use `@Nullable` with enabled compile errors on incorrectly handled `@Nullable` annotations 4. use `Optional` ... 1336. use `@Nullable` WITHOUT compile errors on incorrectly handled `@Nullable` annotations 1337. use nullable argument without `@Nullable` annotation two last options are for me out of the question, since `1337` is evil and `1336` doesn't improve situation. Option `3` is sadly impossible for Flink. The same logic applies for me to other use cases (like fields, return values etc): 1. avoid nulls/optionals (for example via builders or named parameters with default values) 2. use `@Nullable` with compiler errors 3. use `Optional` > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537017#comment-16537017 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201008748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { --- End diff -- Please deduplicate this code with `TableSourceFactoryService`. I haven't looked into this thoroughly so I'm not sure if you should extract some static methods or introduce a base class, but there are some code duplications like: ``` val iter = ... val requiredContextJava = ... val plainContext = ... val plainProperties = ... ``` etc. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537018#comment-16537018 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201003919 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousClassBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +assertTrue( + TableFormatFactoryService +.find(classOf[TestAmbiguousTableFormatFactory], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousSchemaBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("schema.weird_field", "unknown") // this is unknown to the schema derivation factory --- End diff -- Shouldn't we throw an error on not recognised/unused properties? Silently ignoring such properties is prone for typos. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537025#comment-16537025 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201026527 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousClassBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +assertTrue( + TableFormatFactoryService +.find(classOf[TestAmbiguousTableFormatFactory], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test + def testAmbiguousSchemaBasedSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("schema.weird_field", "unknown") // this is unknown to the schema derivation factory +assertTrue( + TableFormatFactoryService +.find(classOf[TableFormatFactory[_]], props) +.isInstanceOf[TestAmbiguousTableFormatFactory]) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testMissingClass(): Unit = { +val props = properties() +// this class is not a valid factory +TableFormatFactoryService.find(classOf[TableFormatFactoryServiceTest], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testInvalidContext(): Unit = { +val props = properties() +props.put("format.type", "FAIL") // no context specifies this +TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props) + } + + @Test(expected = classOf[NoMatchingTableFormatException]) + def testUnsupportedProperty(): Unit = { +val props = properties() +props.put("format.path_new", "/new/path") // no factory has this --- End diff -- `s/path_new/property_not_defined_by_any_factory/g` and remove comment > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels:
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537006#comment-16537006 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200992037 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FormatDescriptorValidator; +import org.apache.flink.table.descriptors.JsonValidator; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.formats.DeserializationSchemaFactory; +import org.apache.flink.table.formats.SerializationSchemaFactory; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Table format factory for providing configured instances of JSON-to-row {@link SerializationSchema} + * and {@link DeserializationSchema}. + */ +public class JsonRowFormatFactory implements SerializationSchemaFactory, DeserializationSchemaFactory { + + @Override + public Map requiredContext() { + final Map context = new HashMap<>(); + context.put(FormatDescriptorValidator.FORMAT_TYPE(), JsonValidator.FORMAT_TYPE_VALUE); + context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1"); + return context; + } + + @Override + public boolean supportsSchemaDerivation() { + return true; + } + + @Override + public List supportedProperties() { + final List properties = new ArrayList<>(); + properties.add(JsonValidator.FORMAT_JSON_SCHEMA); + properties.add(JsonValidator.FORMAT_SCHEMA); + properties.add(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD); + properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA()); + properties.addAll(SchemaValidator.getSchemaDerivationKeys()); + return properties; + } + + @Override + public DeserializationSchema createDeserializationSchema(Map properties) { + final DescriptorProperties props = new DescriptorProperties(true); --- End diff -- ditto regarding `props` and extracting duplicated code > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537011#comment-16537011 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200999523 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.utils.TableTestBase +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ + +class TableSourceDescriptorTest extends TableTestBase { + + @Test + def testStreamTableSourceDescriptor(): Unit = { +val util = streamTestUtil() +val desc = util.tableEnv --- End diff -- Not typed `desc` doesn't tell me what this field is (I even can not suggest how to rename it). > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537008#comment-16537008 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200998167 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala --- @@ -50,7 +50,8 @@ trait DefinedFieldMapping { * type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make * fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]]. * -* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields. +* @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields or --- End diff -- `Optional` :( > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536997#comment-16536997 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200983274 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java --- @@ -32,7 +33,13 @@ /** * Kafka {@link StreamTableSource} for Kafka 0.10. + * + * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together + * with descriptors for schema and format instead. Descriptors allow for + * implementation-agnostic definition of tables. See also + * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. */ +@Deprecated --- End diff -- As we talked offline. I wasn't sure if we could drop `PublicEvolving` classes instead of deprecating them. But if you want to deprecate them for one release, it might be better idea to do so. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537021#comment-16537021 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201017796 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) --- End diff -- shouldn't you test what was found? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537004#comment-16537004 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200988428 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -55,50 +57,101 @@ */ @Internal public abstract class KafkaTableSource - implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes { + implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes, DefinedFieldMapping { + + // common table source attributes + // TODO make all attributes final once we drop support for format-specific table sources /** The schema of the table. */ private final TableSchema schema; + /** Field name of the processing time attribute, null if no processing time field is defined. */ + private String proctimeAttribute; + + /** Descriptor for a rowtime attribute. */ + private List rowtimeAttributeDescriptors; + + /** Mapping for the fields of the table schema to fields of the physical returned type or null. */ + private Map fieldMapping; --- End diff -- please check my other comment about that. `@Nullable` without compiler errors is not in any way better :( > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537024#comment-16537024 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201019023 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory +import org.junit.Assert.{assertNotNull, assertTrue} +import org.junit.Test + +/** + * Tests for [[TableFormatFactoryService]]. + */ +class TableFormatFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { +val props = properties() +assertNotNull( + TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testDifferentContextVersion(): Unit = { +val props = properties() +props.put("format.property-version", "2") +// the format should still be found + assertNotNull(TableFormatFactoryService.find(classOf[TableFormatFactory[_]], props)) + } + + @Test + def testAmbiguousMoreSupportSelection(): Unit = { +val props = properties() +props.remove("format.important") +props.put("format.special_path", "/what/ever") --- End diff -- How does this test differs from `testAmbiguousClassBasedSelection`? What does it test? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537015#comment-16537015 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201002716 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} + +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable + +/** + * Service provider interface for finding a suitable [[TableFormatFactory]] for the + * given properties. + */ +object TableFormatFactoryService extends Logging { + + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFormatFactory[_]]) + + /** +* Finds a table format factory of the given class and creates configured instances from the +* given descriptor. +* +* @param factoryClass desired format factory +* @param descriptor descriptor that describes the format +* @tparam T factory class type +* @return configured instance from factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +find(factoryClass, descriptor, null) --- End diff -- `Option[ClassLoader]` - same argument as before regarding nulls > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537007#comment-16537007 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200998882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala --- @@ -139,9 +139,14 @@ object TableSourceFactoryService extends Logging { Seq[String]() } +// extract wildcard prefixes --- End diff -- please extract at least both `// extract wildcard prefixes` and `// check for supported properties` into separate method (this method is already too large) > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537002#comment-16537002 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200991456 --- Diff: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.table.descriptors.Avro; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.formats.DeserializationSchemaFactory; +import org.apache.flink.table.formats.SerializationSchemaFactory; +import org.apache.flink.table.formats.TableFormatFactoryService; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link AvroRowFormatFactory}. + */ +public class AvroRowFormatFactoryTest { --- End diff -- Hmm, @tillrohrmann was always bugging me about extending from `TestLogger` since it helps in case of deadlocks. However I might be missing something? > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537020#comment-16537020 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201016561 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/formats/TableFormatFactoryServiceTest.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.{AmbiguousTableFormatException, NoMatchingTableFormatException} +import org.apache.flink.table.formats.utils.TestAmbiguousTableFormatFactory --- End diff -- Am I guessing correctly, that this test is searching the class path for classes that extend `TableFormatFactory` and tries to instantiate/match them to the properties below? This reminds me too much of testing static code or code with global variables. Also it probably shows problem with encapsulation/code reusability. Shouldn't `TableFormatFactoryService` be split into following components: 1. `TableFormatFactoryDiscoverer` or even more generic `new ClassDiscoverer(classOf[TableFormatFactory])` - class that searches for the instances of `TableFormatFactory` and returns them as a list 2. `new TableFormatFactoryService(formats: Seq[Class[TableFormatFactory]])` that validates properties and instantiate format factories 3. some tiny class/code that glues 1. and 2. together testing those components would be easier and less implicit compared to how it is now. For example now, it is unclear to me which format factories affect this test and in what way. > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537016#comment-16537016 ] ASF GitHub Bot commented on FLINK-8558: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r201001341 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.utils.TableTestBase +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ + +class TableSourceDescriptorTest extends TableTestBase { + + @Test + def testStreamTableSourceDescriptor(): Unit = { +val util = streamTestUtil() +val desc = util.tableEnv + .from( +FileSystem() + .path("/path/to/csv")) + .withFormat( +Csv() + .field("myfield", Types.STRING) + .field("myfield2", Types.INT) + .quoteCharacter(';') + .fieldDelimiter("#") + .lineDelimiter("\r\n") + .commentPrefix("%%") + .ignoreFirstLine() + .ignoreParseErrors()) +.withSchema( + Schema() +.field("myfield", Types.STRING) +.field("myfield2", Types.INT) +.field("proctime", Types.SQL_TIMESTAMP).proctime() +) +val expected = Seq( + "connector.property-version" -> "1", + "connector.type" -> "filesystem", + "connector.path" -> "/path/to/csv", + "format.property-version" -> "1", + "format.type" -> "csv", + "format.fields.0.name" -> "myfield", + "format.fields.0.type" -> "VARCHAR", + "format.fields.1.name" -> "myfield2", + "format.fields.1.type" -> "INT", + "format.quote-character" -> ";", + "format.field-delimiter" -> "#", + "format.line-delimiter" -> "\r\n", + "format.comment-prefix" -> "%%", + "format.ignore-first-line" -> "true", + "format.ignore-parse-errors" -> "true", + "schema.0.name" -> "myfield", + "schema.0.type" -> "VARCHAR", + "schema.1.name" -> "myfield2", + "schema.1.type" -> "INT", + "schema.2.name" -> "proctime", + "schema.2.type" -> "TIMESTAMP", + "schema.2.proctime" -> "true" +).toMap + +val actual = new DescriptorProperties(true) +desc.addProperties(actual) + +assertEquals(expected.asJava, actual.asMap) + } + + @Test + def testBatchTableSourceDescriptor(): Unit = { --- End diff -- please deduplicate this with streaming case. It seems like they differ only on used `util` type (that have common subtype) and one field. Thus you could create a common private method ``` @Test def testBatchTableSourceDescriptor = testTableSourceDescriptor(false) @Test def testStreamingTableSourceDescriptor = testTableSourceDescriptor(true) def testTableSourceDescriptor(isStreaming: Boolean) { // deduplicated code here with 3 small if's } > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as >