[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380880#comment-16380880
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r171354820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_NAME = "name"
+  val SCHEMA_TYPE = "type"
+  val SCHEMA_PROCTIME = "proctime"
+  val SCHEMA_FROM = "from"
+
+  // utilities
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = toScala(
+properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+}
+  }
+}
+toJava(None)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  attributes += new RowtimeAttributeDescriptor(
+properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+extractor,
+strategy)
+}
+}
+
+attributes.asJava
+  }
+
+  /**
+* Finds a table source field mapping.
+*/
+  def deriveFieldMapping(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: util.Map[String, String] = {
+
+val mapping = mutable.Map[String, String]()
+
+val schema = properties.getTableSchema(SCHEMA)
+
+// add all schema fields first for implicit mappings
+schema.getColumnNames.foreach { name =>
+  mapping.put(name, name)
+}
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+  toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
 
-  // per column properties
+// add explicit mapping
+case Some(source) =>
+  mapping.put(name, source)
 
-  val NAME = "name"
-  val TYPE = "type"
-  val PROCTIME = "proctime"
-  val PROCTIME_VALUE_TRUE = "true"
-  val FROM = "from"
+// implicit mapping or time
+case None =>
+  val isProctime = properties
+.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+.orElse(false)
+  val isRowtime = properties
+.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  // remove proctime/rowtime from mapping
+  if (isProctime || isRowtime) {
+mapping.remove(name)
+  }
+  // check for invalid fields
+  else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
+throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
+  s"from source. Please specify the source field from which it 
can be derived.")
+  }
+  }
+}
 
+mapping.toMap.asJava
+  }
+
+  /**
+* Finds the fields that can be used for a format schema (without time 
attributes).
+*/
+  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
--- End diff --

Thanks for your explanation @twalthr. I totally agree that we should avoid 
letting the users define schemas multi-times. As the names and definitions are 
still confusing me, I'd share my understanding to see if it's correct. Let's 
take the KafkaJsonTableSource as an example. Briefly, the schema mapping can be 
illustrated with

```

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380514#comment-16380514
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r171292456
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_NAME = "name"
+  val SCHEMA_TYPE = "type"
+  val SCHEMA_PROCTIME = "proctime"
+  val SCHEMA_FROM = "from"
+
+  // utilities
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = toScala(
+properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+}
+  }
+}
+toJava(None)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  attributes += new RowtimeAttributeDescriptor(
+properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+extractor,
+strategy)
+}
+}
+
+attributes.asJava
+  }
+
+  /**
+* Finds a table source field mapping.
+*/
+  def deriveFieldMapping(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: util.Map[String, String] = {
+
+val mapping = mutable.Map[String, String]()
+
+val schema = properties.getTableSchema(SCHEMA)
+
+// add all schema fields first for implicit mappings
+schema.getColumnNames.foreach { name =>
+  mapping.put(name, name)
+}
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+  toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
 
-  // per column properties
+// add explicit mapping
+case Some(source) =>
+  mapping.put(name, source)
 
-  val NAME = "name"
-  val TYPE = "type"
-  val PROCTIME = "proctime"
-  val PROCTIME_VALUE_TRUE = "true"
-  val FROM = "from"
+// implicit mapping or time
+case None =>
+  val isProctime = properties
+.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+.orElse(false)
+  val isRowtime = properties
+.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  // remove proctime/rowtime from mapping
+  if (isProctime || isRowtime) {
+mapping.remove(name)
+  }
+  // check for invalid fields
+  else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
+throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
+  s"from source. Please specify the source field from which it 
can be derived.")
+  }
+  }
+}
 
+mapping.toMap.asJava
+  }
+
+  /**
+* Finds the fields that can be used for a format schema (without time 
attributes).
+*/
+  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
--- End diff --

No problem @xccui. My goal was to allow users to specify all fields only 
once. Because users often have tables with 30+ columns. When I opened the PR I 
added a possibility to derive a `schema` from a `format` schema. But according 
to a SQL DDL statement `CREATE TABLE (..) [FORMAT] ...` the `schema` must be 
always complete and the `format` schema might be derived, 

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380469#comment-16380469
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r171280725
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_NAME = "name"
+  val SCHEMA_TYPE = "type"
+  val SCHEMA_PROCTIME = "proctime"
+  val SCHEMA_FROM = "from"
+
+  // utilities
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = toScala(
+properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+}
+  }
+}
+toJava(None)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  attributes += new RowtimeAttributeDescriptor(
+properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+extractor,
+strategy)
+}
+}
+
+attributes.asJava
+  }
+
+  /**
+* Finds a table source field mapping.
+*/
+  def deriveFieldMapping(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: util.Map[String, String] = {
+
+val mapping = mutable.Map[String, String]()
+
+val schema = properties.getTableSchema(SCHEMA)
+
+// add all schema fields first for implicit mappings
+schema.getColumnNames.foreach { name =>
+  mapping.put(name, name)
+}
+
+val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+for (i <- 0 until names.size) {
+  val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+  toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
 
-  // per column properties
+// add explicit mapping
+case Some(source) =>
+  mapping.put(name, source)
 
-  val NAME = "name"
-  val TYPE = "type"
-  val PROCTIME = "proctime"
-  val PROCTIME_VALUE_TRUE = "true"
-  val FROM = "from"
+// implicit mapping or time
+case None =>
+  val isProctime = properties
+.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+.orElse(false)
+  val isRowtime = properties
+.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  // remove proctime/rowtime from mapping
+  if (isProctime || isRowtime) {
+mapping.remove(name)
+  }
+  // check for invalid fields
+  else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
+throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
+  s"from source. Please specify the source field from which it 
can be derived.")
+  }
+  }
+}
 
+mapping.toMap.asJava
+  }
+
+  /**
+* Finds the fields that can be used for a format schema (without time 
attributes).
+*/
+  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
--- End diff --

Hi @twalthr, sorry for mentioning you again. I was a little confused about 
this method. Could you help explain its usage? Besides, the rowtime field 
should be an existing field in the input format. Why removing it here?

Thanks, Xingcan


> Add a Kafka table source factory with JSON format support
> -
>

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379148#comment-16379148
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5564


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379044#comment-16379044
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5564
  
Thank you @fhueske. Will merge...


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379026#comment-16379026
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5564
  
Thanks for the update. 
I think this is good to merge.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376735#comment-16376735
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170569676
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
--- End diff --

I wouldn't do another sub property. It looks already complicated enough in 
YAML.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376730#comment-16376730
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170569299
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
--- End diff --

We also use this `from` for `schema`. `from-field` is already defined in 
`rowtime.timestamps.type`.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376696#comment-16376696
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170562304
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * The validator for {@link Kafka}.
+ */
+public class KafkaValidator extends ConnectorDescriptorValidator {
+
+   public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
+   public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
+   public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
+   public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
+   public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+   public static final String CONNECTOR_TOPIC = "connector.topic";
+   public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
+   public static final String 
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS = 
"connector.specific-offsets";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = 
"partition";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
+   public static final String CONNECTOR_PROPERTIES = 
"connector.properties";
--- End diff --

Yes, these are very Kafka specific properties. That's where they are also 
pushed down by the table source builder.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376692#comment-16376692
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170561781
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
--- End diff --

We need to provide the superclass to validate what we just deserialized 
otherwise it would lead to class cast exception.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374620#comment-16374620
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170304145
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
--- End diff --

Sorry about that. I forgot to rebuild after rebasing.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374593#comment-16374593
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170297941
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
--- End diff --

rename to `removePropertyAndVerify()`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374575#comment-16374575
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170229924
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -89,37 +105,58 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 put(key, clazz.getName)
   }
 
+  /**
+* Adds a string under the given key.
+*/
   def putString(key: String, str: String): Unit = {
 checkNotNull(key)
 checkNotNull(str)
 put(key, str)
   }
 
+  /**
+* Adds a boolean under the given key.
+*/
   def putBoolean(key: String, b: Boolean): Unit = {
 checkNotNull(key)
 put(key, b.toString)
   }
 
+  /**
+* Adds a long under the given key.
+*/
   def putLong(key: String, l: Long): Unit = {
 checkNotNull(key)
 put(key, l.toString)
   }
 
+  /**
+* Adds an integer under the given key.
+*/
   def putInt(key: String, i: Int): Unit = {
 checkNotNull(key)
 put(key, i.toString)
   }
 
+  /**
+* Adds a character under the given key.
+*/
   def putCharacter(key: String, c: Character): Unit = {
 checkNotNull(key)
 checkNotNull(c)
 put(key, c.toString)
   }
 
+  /**
+* Adds a table schema under the given key.
+*/
   def putTableSchema(key: String, schema: TableSchema): Unit = {
 putTableSchema(key, normalizeTableSchema(schema))
   }
 
+  /**
+* Adds a table schema under the given key.
+*/
   def putTableSchema(key: String, nameAndType: Seq[(String, String)]): 
Unit = {
--- End diff --

Remove?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374581#comment-16374581
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230962
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
+properties.get(key).map { name =>
+  val clazz = try {
+Class.forName(
+  name,
+  true,
+  
Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+  } catch {
+case e: Exception =>
+  throw new ValidationException(s"Coult not get class for key 
'$key'.", e)
--- End diff --

Add name of class?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374594#comment-16374594
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170296912
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+/**
+  * Validator for {@link Json}.
+  */
+public class JsonValidator extends FormatDescriptorValidator {
+
+   public static final String FORMAT_TYPE_VALUE = "json";
+   public static final String FORMAT_SCHEMA = "format.schema";
+   public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
+   public static final String FORMAT_FAIL_ON_MISSING_FIELD = 
"format.fail-on-missing-field";
+
+   @Override
+   public void validate(DescriptorProperties properties) {
+   super.validate(properties);
+   final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA);
+   final boolean hasSchemaString = 
properties.containsKey(FORMAT_JSON_SCHEMA);
+   if (hasSchema && hasSchemaString) {
+   throw new ValidationException("A definition of both a 
schema and JSON schema is not allowed.");
+   } else if (!hasSchema && !hasSchemaString) {
+   throw new ValidationException("A definition of a schema 
and JSON schema is required.");
--- End diff --

replace "and" by "or" -> "A definition of a schema or JSON schema is 
required."


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374589#comment-16374589
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170290993
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 ---
@@ -18,48 +18,67 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
 import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-val desc = Rowtime()
-  .timestampsFromField("otherField")
-  .watermarksPeriodicBounding(1000L)
-val expected = Seq(
-  "rowtime.0.version" -> "1",
-  "rowtime.0.timestamps.type" -> "from-field",
-  "rowtime.0.timestamps.from" -> "otherField",
-  "rowtime.0.watermarks.type" -> "periodic-bounding",
-  "rowtime.0.watermarks.delay" -> "1000"
-)
-verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
--- End diff --

use constant instead of `"rowtime.watermarks.type"`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374586#comment-16374586
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170271213
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -32,11 +32,36 @@ import scala.collection.mutable
   */
 class Schema extends Descriptor {
 
+  private var deriveFields: Option[String] = None
+
   // maps a field name to a list of properties that describe type, origin, 
and the time attribute
   private val tableSchema = mutable.LinkedHashMap[String, 
mutable.LinkedHashMap[String, String]]()
 
   private var lastField: Option[String] = None
 
+  /**
+* Derives field names and types from a preceding connector or format. 
Additional fields that
+* are defined in this schema extend the derived fields. The derived 
fields are
+* added in an alphabetical order according to their field name.
+*/
+  def deriveFieldsAlphabetically(): Schema = {
--- End diff --

I think we should support inferring the format from the schema rather than 
the schema from the format.
This would be more aligned with how it would work in a `CREATE TABLE` 
statement and how Hive is doing it for example. We should still support to 
define the format explicitly though.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374579#comment-16374579
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256878
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
--- End diff --

`rowtime.watermarks.custom.serialized`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374578#comment-16374578
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256073
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
--- End diff --

`rowtime.timestamps.custom.class`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374574#comment-16374574
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
--- End diff --

Remove Scala equivalent?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374577#comment-16374577
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170257084
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
--- End diff --

BOUNDING -> BOUNDED


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374587#comment-16374587
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170272030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_PROPERTY_VERSION = "schema.property-version"
+  val SCHEMA_FIELDS = "schema.fields"
+  val SCHEMA_FIELDS_NAME = "name"
+  val SCHEMA_FIELDS_TYPE = "type"
+  val SCHEMA_FIELDS_PROCTIME = "proctime"
+  val SCHEMA_FIELDS_FROM = "from"
+  val SCHEMA_DERIVE_FIELDS = "schema.derive-fields"
+  val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically"
+  val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially"
+
+  // utilities
+
+  /**
+* Derives a schema from properties and source.
+*/
+  def deriveSchema(
+  properties: DescriptorProperties,
+  sourceSchema: Option[TableSchema])
+: TableSchema = {
+
+val builder = TableSchema.builder()
+
+val schema = properties.getTableSchema(SCHEMA_FIELDS)
+
+val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS)
+
+val sourceNamesAndTypes = derivationMode match {
+  case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if 
sourceSchema.isDefined =>
+// sort by name
+sourceSchema.get.getColumnNames
+  .zip(sourceSchema.get.getTypes)
+  .sortBy(_._1)
+
+  case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if 
sourceSchema.isDefined =>
+sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes)
+
+  case Some(_) =>
+throw new ValidationException("Derivation of fields is not 
supported from this source.")
+
+  case None =>
+Array[(String, TypeInformation[_])]()
+}
+
+// add source fields
+sourceNamesAndTypes.foreach { case (n, t) =>
+  builder.field(n, t)
+}
+
+// add schema fields
+schema.foreach { ts =>
+  val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes)
+  schemaNamesAndTypes.foreach { case (n, t) =>
+  // do not allow overwriting
+  if (sourceNamesAndTypes.exists(_._1 == n)) {
+throw new ValidationException(
+  "Specified schema fields must not overwrite fields derived 
from the source.")
+  }
+  builder.field(n, t)
+  }
+}
+
+builder.build()
+  }
+
+  /**
+* Derives a schema from properties and source.
+* This method is intended for Java code.
+*/
+  def deriveSchema(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: TableSchema = {
+deriveSchema(
+  properties,
+  Option(sourceSchema.orElse(null)))
+  }
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Option[String] = {
+val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = 
properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME")
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME")
+}
+  }
+}
+None
+  }
+
+  /**
+* Finds the proctime attribute if defined.
+* This method is intended for Java code.
+*/
+  def deriveProctimeOptional(properties: DescriptorProperties): 
Optional[String] = {
+Optional.ofNullable(deriveProctimeAttribute(properties).orNull)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374595#comment-16374595
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170298004
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
--- End diff --

rename to `addPropertyAndVerify()`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374584#comment-16374584
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170257053
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
+  val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay"
--- End diff --

`rowtime.watermarks.bounded.delay`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374580#comment-16374580
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
--- End diff --

`rowtime.watermarks.custom.class`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374592#comment-16374592
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170291034
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 ---
@@ -18,48 +18,67 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
 import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-val desc = Rowtime()
-  .timestampsFromField("otherField")
-  .watermarksPeriodicBounding(1000L)
-val expected = Seq(
-  "rowtime.0.version" -> "1",
-  "rowtime.0.timestamps.type" -> "from-field",
-  "rowtime.0.timestamps.from" -> "otherField",
-  "rowtime.0.watermarks.type" -> "periodic-bounding",
-  "rowtime.0.watermarks.delay" -> "1000"
-)
-verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
   }
 
   @Test(expected = classOf[ValidationException])
   def testMissingWatermarkClass(): Unit = {
-verifyMissingProperty("rowtime.0.watermarks.class")
+verifyMissingProperty(descriptors().get(1), "rowtime.watermarks.class")
--- End diff --

use constant


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374591#comment-16374591
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170273904
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 ---
@@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase {
   "format.fields.3.name" -> "field4",
   "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
--- End diff --

Shouldn't this fail because CSV does not support nested data?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374576#comment-16374576
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256006
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
--- End diff --

`rowtime.timestamps.from.field`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374582#comment-16374582
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170229569
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -128,6 +165,13 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 )
   }
 
+  /**
+* Adds a table schema under the given key. This method is intended for 
Java code.
+*/
+  def putTableSchema(key: String, nameAndType: JList[JTuple2[String, 
String]]): Unit = {
--- End diff --

I think we should drop the Scala equivalent method. This is not a public 
API class that needs a shiny Scala interface but should be usable from Java and 
Scala.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374585#comment-16374585
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170232160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 Some(schemaBuilder.build())
   }
 
+  /**
+* Returns a table schema under the given key if it exists.
+*/
+  def getOptionalTableSchema(key: String): Optional[TableSchema] = 
toJava(getTableSchema(key))
+
+  /**
+* Returns the type information under the given key if it exists.
+*/
+  def getType(key: String): Option[TypeInformation[_]] = {
+properties.get(key).map(TypeStringUtils.readTypeInfo)
+  }
+
+  /**
+* Returns the type information under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalType(key: String): Optional[TypeInformation[_]] = {
+toJava(getType(key))
+  }
+
+  /**
+* Returns a prefix subset of properties.
+*/
+  def getPrefix(prefixKey: String): Map[String, String] = {
+val prefix = prefixKey + '.'
+properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) =>
+  k.substring(prefix.length) -> v // remove prefix
+}.toMap
+  }
+
+  /**
+* Returns a prefix subset of properties.
+* This method is intended for Java code.
+*/
+  def getPrefixMap(prefixKey: String): JMap[String, String] = 
getPrefix(prefixKey).asJava
--- End diff --

I find the different names for methods that do the same confusing. I'd just 
remove the Scala methods.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374590#comment-16374590
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170273135
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -41,10 +41,10 @@ trait TableSourceFactory[T] {
 *   - connector.type
 *   - format.type
 *
-* Specified versions allow the framework to provide backwards 
compatible properties in case of
--- End diff --

(not related to this change) Should we add something like a `priority` 
property to `TableSourceFactory` that determines in which order factories are 
matched. If two factories match, we would use the factory with the higher 
priority.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374583#comment-16374583
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256141
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
--- End diff --

`rowtime.timestamps.custom.serialized`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374588#comment-16374588
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170274382
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
--- End diff --

Why is this important?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374572#comment-16374572
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -155,6 +199,28 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed sequence of properties (with sub-properties) under a 
common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.type = LONG, schema.fields.1.name = test2
+*
+* The arity of each propertyValue must match the arity of propertyKeys.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedFixedProperties(
--- End diff --

Remove Scala equivalent?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374573#comment-16374573
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
+properties.get(key).map { name =>
+  val clazz = try {
+Class.forName(
+  name,
+  true,
+  
Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+  } catch {
+case e: Exception =>
+  throw new ValidationException(s"Coult not get class for key 
'$key'.", e)
--- End diff --

typo: Could


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374571#comment-16374571
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170224845
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 ---
@@ -83,10 +84,32 @@ protected JsonRowDeserializationSchema 
getDeserializationSchema() {
 
@Override
public String explainSource() {
-   return "KafkaJSONTableSource";
+   return "KafkaJsonTableSource";
}
 
-    SETTERS FOR OPTIONAL PARAMETERS
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (!(o instanceof KafkaJsonTableSource)) {
+   return false;
+   }
+   if (!super.equals(o)) {
+   return false;
+   }
+   KafkaJsonTableSource that = (KafkaJsonTableSource) o;
+   return failOnMissingField == that.failOnMissingField &&
+   Objects.equals(jsonSchema, that.jsonSchema) &&
+   Objects.equals(fieldMapping, that.fieldMapping);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, 
failOnMissingField);
--- End diff --

`TableSchema` does not override `hashCode()`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374100#comment-16374100
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5505


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374098#comment-16374098
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170180103
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
--- End diff --

It seems we also need a `getOptionalClass()` wrapper for it. Additionally, 
I think it's a little strange to provide the superclass as a parameter here.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374097#comment-16374097
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170191830
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.formats.json.JsonSchemaConverter;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.Json;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactory;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KafkaJsonTableSourceFactory}.
+ */
+public abstract class KafkaJsonTableSourceFactoryTestBase {
+
+   private static final String JSON_SCHEMA =
+   "{" +
+   "  'title': 'Fruit'," +
+   "  'type': 'object'," +
+   "  'properties': {" +
+   "'name': {" +
+   "  'type': 'string'" +
+   "}," +
+   "'count': {" +
+   "  'type': 'integer'" +
+   "}," +
+   "'time': {" +
+   "  'description': 'Age in years'," +
+   "  'type': 'number'" +
+   "}" + "  }," +
+   "  'required': ['name', 'count', 'time']" +
+   "}";
+
+   private static final String TOPIC = "test-topic";
+
+   protected abstract String version();
+
+   protected abstract KafkaJsonTableSource.Builder builder();
+
+   protected abstract KafkaJsonTableSourceFactory factory();
+
+   @Test
+   public void testResultingTableSource() {
+
+   // construct table source using a builder
+
+   final Map tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("fruit-name", "name");
+   tableJsonMapping.put("count", "count");
+   tableJsonMapping.put("event-time", "time");
+
+   final Properties props = new Properties();
+   props.put("group.id", "test-group");
+   props.put("bootstrap.servers", "localhost:1234");
+
+   final Map specificOffsets = new 
HashMap<>();
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+   specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+
+   final KafkaTableSource builderSource = builder()
+   
.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(JSON_SCHEMA)))
+   .failOnMissingField(true)
+   .withTableToJsonMapping(tableJsonMapping)
+   .withKafkaProperties(props)
+   .forTopic(TOPIC)
+   .fromSpecificOffsets(specificOffsets)
+   .withSchema(
+   TableSchema.builder()
+   .field("fruit-name", 

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374092#comment-16374092
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170188551
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
--- End diff --

` this.connectorDescriptor = Some(connector)`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374095#comment-16374095
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170175378
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
--- End diff --

Too many arguments.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374093#comment-16374093
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170185039
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * The validator for {@link Kafka}.
+ */
+public class KafkaValidator extends ConnectorDescriptorValidator {
+
+   public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
+   public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
+   public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
+   public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
+   public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+   public static final String CONNECTOR_TOPIC = "connector.topic";
+   public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+   public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
+   public static final String 
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS = 
"connector.specific-offsets";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = 
"partition";
+   public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
+   public static final String CONNECTOR_PROPERTIES = 
"connector.properties";
--- End diff --

If we take the required properties (e.g., bootstrap.servers, group.id) as 
common ones here, the validation logic is pushed down to the underlayer 
components, right?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374096#comment-16374096
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170175473
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
+
+  def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = {
+this.formatDescriptor = Some(format)
+this
+  }
+
+  def addSchema(schema: Schema): TestTableSourceDescriptor = {
+this.schemaDescriptor = Some(schema)
+this
+  }
+
+  def getPropertyMap: java.util.Map[String, String] = {
+val props = new DescriptorProperties()
+connectorDescriptor.addProperties(props)
--- End diff --

`addProperties()` has been removed and this method `getPropertyMap()` seems 
to be useless now.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374091#comment-16374091
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170194561
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
 ---
@@ -27,13 +27,27 @@ class ConnectorDescriptorValidator extends 
DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
 properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 
1)
-properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+properties.validateInt(CONNECTOR_PROPERTY_VERSION, isOptional = true, 
0, Integer.MAX_VALUE)
   }
 }
 
 object ConnectorDescriptorValidator {
 
+  /**
+* Key for describing the type of the connector. Usually used for 
factory discovery.
+*/
   val CONNECTOR_TYPE = "connector.type"
+
+  /**
+*  Key for describing the property version. This property can be used 
for backwards
--- End diff --

Two spaces here...


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374094#comment-16374094
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170181909
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
 ---
@@ -43,7 +42,7 @@ object TableSourceFactoryService extends Logging {
   def findTableSourceFactory(descriptor: TableSourceDescriptor): 
TableSource[_] = {
--- End diff --

Rename to `findAndCreateTableSource()`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373044#comment-16373044
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5564
  
What do you think @xccui @fhueske?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373037#comment-16373037
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5564

[FLINK-8538] [table] Add a Kafka table source factory with JSON format

## What is the purpose of the change

This PR is a continuation of #5505. Since this is the first connector with 
format, time attributes, and table source factory, I encountered many 
inconsistencies that I try to fix in this PR.


## Brief change log

- Change property `connector.version` to `connector.property-version` in 
order to use `version` for things like Kafka
- Add more utility functions for better Java<->Scala interoperability
- Add full rowtime support
- Derive JSON mapping from schema
- Derive schema from source if required


## Verifying this change

Various unit tests implemented.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs/ScalaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8538

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5564


commit bc6af3a27155651622149cf4a74e52118773471a
Author: Xingcan Cui 
Date:   2018-02-12T10:11:36Z

[FLINK-8538][table]Add a Kafka table source factory with JSON format support

commit 233a8ff0b88359dd702cfd6e447e0585e411c8c2
Author: Timo Walther 
Date:   2018-02-19T12:35:45Z

[FLINK-8538] [table] Improve unified table sources




> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368158#comment-16368158
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5505
  
Yes @twalthr, that will be great! I'll start working on the 
`KafkaAvroTableSourceFactory` and keep an eye on the API refactorings.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368151#comment-16368151
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5505
  
Thanks for the PR @xccui. You are right, we should provide more utility 
method in `DescriptorProperties`. Especially for improving the interoperability 
Scala<->Java. I will fix those weaknesses next week and see if I can simply the 
logic in your PR.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-16 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367293#comment-16367293
 ] 

Timo Walther commented on FLINK-8538:
-

Sorry for my impatience. Thank you for your efforts! I will review it asap.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-16 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366956#comment-16366956
 ] 

Xingcan Cui commented on FLINK-8538:


Sorry for the late PR [~twalthr]. I was just here waiting for the CI results. 
It's kind of you to help look through it.
Thanks, Xingcan

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366928#comment-16366928
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5505

[FLINK-8538][table]Add a Kafka table source factory with JSON format support

## What is the purpose of the change

This PR adds Kafka JSON table source factories for different Kafka versions.

## Brief change log

- Adds a `Kafka` connector descriptor and a corresponding `KafkaValidator`.
- Adds a `KafkaJsonTableSourceFactory` and different version specific 
implementations.
- Adds a method to get the column numbers in `TableSchema`.
- Adds `equals()` and `hashCode()` methods for `KafkaTableSource` and 
`KafkaJsonTableSource`.

**Note:** the rowtime setting has not been implemented yet as I think a 
more friendly API to get the rowtime attributes should be provided in 
`DescriptorProperties`.

## Verifying this change

This change can be verified by the tests added in 
`KafkaJsonTableFromDescriptorTestBase` and the other sub-classes. However, they 
are temporarily commented.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8538

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5505


commit 3da847507b65047a3fd02058596f6f712a9332de
Author: Xingcan Cui 
Date:   2018-02-12T10:11:36Z

[FLINK-8538][table]Add a Kafka table source factory with JSON format support




> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-16 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366925#comment-16366925
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] any news on this? If you find no time for it in the next days, just 
open a PR with your current status and I can finish your work.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-14 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364488#comment-16364488
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] I just opened a PR for FLINK-8630. Looking for a reviewer if you are 
interested :)

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-14 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363802#comment-16363802
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr], I think Friday will be fine. Thought looks awkward, I've 
transformed the scala codes to Java. Looking forward to your PR : )

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-14 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363724#comment-16363724
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] good point maybe we should make those classes more Java-friendly then. 
Everything that is used outside of {{flink-table}} should be Java-like. That's 
why {{TableSourceFactory}} uses Java classes, for instance. Btw. I will open a 
PR for the new JSON parser and module {{flink-json}}. What is your time 
schedule? Do you think you can finish this until Friday (your timezone) such 
that I can finalize it and merge it before the feature freeze?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-14 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363700#comment-16363700
 ] 

Xingcan Cui commented on FLINK-8538:


BTW, it may not be a good idea to implement them in Java since the base classes 
(e.g.,
 {{ConnectorDescriptorValidator}} and {{ConnectorDescriptor}}) are all 
implemented in scala, which means the API contains lots of scala features 
(e.g., function parameter and scala collections). What do you think?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360725#comment-16360725
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for the comments, [~twalthr] and [~fhueske]. Yes, the {{flink-kafka}} is 
just a temporary module and I'll remove it once I find proper locations for the 
codes. It's surely better if we can unify all the version/format specific Kafka 
connectors. However, that may not be a short-term work and thus I'll adapt to 
the current implementations. I think my next task would be FLINK-8537. So feel 
free to assign FLINK-8630.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360630#comment-16360630
 ] 

Fabian Hueske commented on FLINK-8538:
--

Hi, I agree with [~twalthr]. This looks mostly good, but we shouldn't add a new 
module. Also, it would be good to implement the code in Java as the project 
tries to reduce its dependency on Scala.

How about the following?
* the Kafka descriptor -> flink-connector-kafka-base
* a format independent (but version specific) KafkaTableSource -> 
flink-connector-kafka*version*
* format and version specific TableSourceFactories -> 
flink-connector-kafka*version*
* service discovery files -> flink-connector-kafka*version*

The format and version specific TableSourceFactories will be replaced by format 
independent factories once the format factory infrastructure is in place.
We can also go for version specific descriptors, if that makes more sense.
Since most of the changes would be in version-specific modules, I'd start with 
one module first (Kafka 0.11) and port the change later to earlier versions 
once we agreed on the interfaces. Version independent code can also be 
extracted into abstract classes in flink-connector-kafka-base. 

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360606#comment-16360606
 ] 

Timo Walther commented on FLINK-8538:
-

Thanks for working on this [~xccui]. I had a quick look at it. Yes, I think you 
are on the right direction. I think we don't need a new module. We can also add 
this to {{flink-kafka-base}} but the question is if we want one descriptor for 
all Kafka connectors or make them version specific. Actually I'm in favor of 
just one descriptor but since connector interfaces change from version to 
version, it might be better to have {{Kafka010}} as a descriptor. What do you 
think [~fhueske]? I started working on FLINK-8558. Are you planning to work on 
FLINK-8630? Otherwise I would assign it to me. Would be great if we can get 
this in for Flink 1.5.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-12 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360559#comment-16360559
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr] and [~fhueske], I committed a WIP branch to my repository 
([xccui/FLINK-8538|https://github.com/xccui/flink/tree/FLINK-8538]).

A temporary new module {{flink-kafka}} was created for this work since I still 
can't think out how to consolidate the codes - -!. There are still some 
features need to be added, e.g., to support time attributes and schema mapping. 
Considering that the framework has already been constructed, I wonder if you 
could have a look to see if I'm on the right direction.

Thanks, Xingcan

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358494#comment-16358494
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for the comments [~twalthr] and [~fhueske]. Actually, I've encountered 
the problem of parsing the NUMBER type before and I just added an extra field 
to indicate the numeric types as the JSON schema in my application was only 
used internally. I think the Option 2 Timo raised referred to a global config, 
i.e., we always return a specific type,  which could be configured to be double 
or BigDecimal, for the JSON NUMBER type. I'll create a ticket for the JSON 
schema converter. Thanks!

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358372#comment-16358372
 ] 

Fabian Hueske commented on FLINK-8538:
--

Regarding the options:

* Option 1: This was my initial idea. But Timo is right, we would tie the 
format to the Table API which is not a good choice, IMO.
* Option 2: I don't think this will work. What if a schema contains two NUMBER 
types?
* Option 3: How would this help? We would have a custom number type that nobody 
can properly handle.

I think the best approach to support JSON Schema would be to always return 
BigDecimal types.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358295#comment-16358295
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] yes, I think this is necessary. We also have to think about how to 
handle JSON specific types. E.g. the JSON standard declares a "Number" type but 
we have to map it to some Java primitive. It may also declares union types. We 
have the following options:

Option 1: We infer the type using information from the {{TableSchema}} (but 
this would be Table API specific, formats are intended for all APIs).

Option 2: We make this configurable: number as double, number as BigDecimal etc.

Option 3: We introduce a new TypeInformation.

If we really want to support JSON once and for all, we have to think about how 
to handle those cases. I just read a discussion on the Beam ML about this:
https://lists.apache.org/thread.html/ee6843859f1ddb1d4544c32d255fe88a3bf3aec97d3afc3e3d47c701@%3Cdev.beam.apache.org%3E

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358286#comment-16358286
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr], I think we need a converter to transform a standard JSON schema 
to a {{TypeInformation}}, right?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-07 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355523#comment-16355523
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] the nice thing about the SPI is that any JAR file can declare a 
{{META_INF/services/org.apache.flink.table.sources.TableSourceFactory}}. So you 
can add it to {{flink-kafka}} and Java will discover it.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-07 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355511#comment-16355511
 ] 

Xingcan Cui commented on FLINK-8538:


If I understood correctly, maybe we should create a new module 
{{flink-table-connector}} and move all the table source/sink interfaces (e.g., 
{{Descriptor}}, {{TableSource}}), which are currently used by different 
connector modules, from {{flink-table}} to this new module. In that way, all 
other related modules will only depend on this new module.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-07 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355468#comment-16355468
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr] and [~fhueske], it's still a question about the dependency. IMO, 
if we want to unify all the table source factories, they must be added to the 
{{SPI}} configuration file 
({{META_INF/services/org.apache.flink.table.sources.TableSourceFactory}}), 
which is located in the {{flink-table}} subproject. That means the 
{{KafkaJsonTableSourceFactory}} in {{flink-kafka}} should also be registered 
here. Therefore a module cycle between {{flink-table}} and {{flink-kafka}} 
occurs. What do you think of that problem?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353508#comment-16353508
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for your reply, [~fhueske]. I'll think it over.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352657#comment-16352657
 ] 

Fabian Hueske commented on FLINK-8538:
--

Hi [~xccui], please see my comments:
 # Yes, the idea is to make this quite modular. Not only for users that 
implement other connectors, but also for us. For instance, we'd like to reuse 
code for JSON formats when reading from Kafka, Kinesis, Pravega, Files, etc.
 # This is not clear yet and will be driven by user demand. We'll focus on the 
most popular systems for which there are source functions that we can wrap 
(Kafka, Kinesis, file system, etc.)
 # I think so too. Most of the existing sources (which are not too many) would 
need to be refactored to become more modular. For instance, it does not really 
make sense to have Kafka table sources for each version of Kafka and (Avro, 
JSON). This will become hard to maintain once we add more formats like 
ProtoBuf, CSV, or ...
 # The current version does not feature format factories, but this is something 
we are thinking about to add soon as well. A format factory would need to 
provide a format parser that parses data in the format but also returns the 
{{TypeInformation}} of the returned type.

It's great that you want to help with this effort! It would be great if we 
could have a first {{KafkaJsonTableSourceFactory}} soon, to have a Kafka source 
that can be used with the SQL CLI client. Starting from the 
+CsvTableSourceFactory+ sounds like a good idea to me. We should make sure 
though, that the things I discussed above can be later refactored without 
breaking too much API.

Thanks, Fabian

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352634#comment-16352634
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for the explanation [~twalthr]. I've glanced over the PR for FLINK-8240 
and tried to comprehend the work.

1. We are going to provide a unified interface, which is described by general 
string properties, to define the table sources and sinks. Ideally, the users 
can combine the descriptors (in different dimensions such as type, format, and 
schema) to make their own table sources/sinks in the future, right?

2. Do you have any plans for how many table source/sink factories we are going 
to support?

3. I got a feeling that we must do some refactorings for the existing 
connectors since the source types and formats seem to be heavily coupled.

4. When mentioned {{format}} discovery, did you mean to implement different 
{{FormatDescriptors}} and to match them just like using the {{SPI}} for the 
{{TableSourceFactories}}?

I'd like to assist to implement the whole feature :)  For now, maybe I can take 
the {{CsvTableSourceFactory}} as a demo and imitate it to implement an 
elementary {{KafkaTableSourceFactory}}.

What do you think?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1635#comment-1635
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] thanks for working on this issue. This issue depends on general 
changes we want to do regarding formats. I will open further issues as 
blockers. The idea is to have some means to perform {{format}} discovery such 
as Avro, JSON, Protobuff, XML, etc. similar to how the file system discovery 
works now. The {{KafkaTableSource}} factory would be located in {{flink-kafka}} 
and instantiates the needed format. {{flink-kafka}} will only have an optional 
dependency on {{flink-table}} as it is also done at the moment.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-03 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351405#comment-16351405
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr], thanks for opening this.

I just got one little question. Do you think it's proper to add the new codes 
in the {{flink-table}} subproject and make it depend on other Kafka related 
subprojects (e.g., {{flink-connector-kafka-0.9}})?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)