[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395581#comment-16395581
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173888555
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

```
physical schema ==mapping=> "intermediate schema" ==timestamp extraction 
and projection=> logical schema
```
Maybe we should consider eliminating the "intermedia schema" in the future?


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395556#comment-16395556
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173881141
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

@fhueske what do you think about this whole mapping business?


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395545#comment-16395545
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173879267
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

Well, according to the current implementation, you are right. But I still 
feel uncomfortable about that since we actually mix the physical schema (format 
schema) and the logical schema (table schema) into the same map. Do you think 
it's necessary to make some changes here?


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395476#comment-16395476
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173863375
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

The question is what should a rowtime attribute field (or a custom 
extractor) reference? The input or the current schema? I think it should 
reference the input thus all fields (even the renamed ones) need to be present 
in the mapping.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395467#comment-16395467
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173860901
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 ---
@@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
 
final Map tableJsonMapping = new HashMap<>();
+   tableJsonMapping.put("name", "name");
--- End diff --

This "name" to "name" mapping should not exist since we've already 
explicitly defined the "fruit-name" to "name" mapping.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395455#comment-16395455
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173857218
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

I think we should first remove the added source fields before adding the 
explicit mappings with the following snippet. 
```
// add explicit mapping
case Some(source) =>
// should add mapping.remove(source)
mapping.put(name, source)
```


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395220#comment-16395220
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173791129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

Not really. I was justing refactoring #5610. For convenience, I used the 
existing class `org.apache.flink.formats.avro.generated.User` in a test case, 
but it gets so many fields to be mapped.  


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395195#comment-16395195
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173786510
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

Yes @xccui, we could change the logic like this. Do you have cases where we 
need such behavior?


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395187#comment-16395187
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173784883
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -148,6 +148,13 @@ object SchemaValidator {
 
 val schema = properties.getTableSchema(SCHEMA)
 
+// add all source fields first because rowtime might reference one of 
them
+toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
--- End diff --

Hi @twalthr, can we check the used `TimestampExtractor` here? Specifically, 
if it's an `ExistingField`, we only included the target fields; if it's a 
`StreamRecordTimestamp` we don't include extra fields; and only if it's a 
custom extractor we include all the source fields.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393985#comment-16393985
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the explanation, @twalthr! I'll update the PR and resolve the 
conflicts caused.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392937#comment-16392937
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5662
  
Thanks for the comments @xccui. It's never to late for feedback. Sorry, 
maybe I merged this too quickly. We still need to call 
`builder.forJsonSchema()` if the schema contains a `proctime` attribute. The 
most common use case will be to extend the format by time attributes. With your 
approach the format would contain an additional timestamp that is definitely 
not part of the format schema.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392933#comment-16392933
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173465435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -198,14 +205,20 @@ object SchemaValidator {
   val isProctime = properties
 .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
 .orElse(false)
-  val isRowtime = properties
-.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
+  val isRowtime = properties.containsKey(tsType)
   if (!isProctime && !isRowtime) {
 // check for a aliasing
 val fieldName = 
properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
   .orElse(n)
 builder.field(fieldName, t)
   }
+  // only use the rowtime attribute if it references a field
+  else if (isRowtime &&
+  properties.getString(tsType) == 
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
--- End diff --

You are right, we should declare `ExistingField` `final`. In the custom 
extractor case, a user has to supply the format manually. Maybe we will need an 
explanation logic in the future such that a user can see how the derived format 
looks like and if it makes sense to declare it explicitly.


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392913#comment-16392913
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5662#discussion_r173455747
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -198,14 +205,20 @@ object SchemaValidator {
   val isProctime = properties
 .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
 .orElse(false)
-  val isRowtime = properties
-.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+  val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
+  val isRowtime = properties.containsKey(tsType)
   if (!isProctime && !isRowtime) {
 // check for a aliasing
 val fieldName = 
properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
   .orElse(n)
 builder.field(fieldName, t)
   }
+  // only use the rowtime attribute if it references a field
+  else if (isRowtime &&
+  properties.getString(tsType) == 
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
--- End diff --

What if the user uses the custom extractor to define his/her own 
`ExistingField` extractor that references a field?


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392850#comment-16392850
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5662


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392843#comment-16392843
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5662
  
Merging...


> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391163#comment-16391163
 ] 

ASF GitHub Bot commented on FLINK-8854:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5662

[FLINK-8854] [table] Fix schema mapping with time attributes

## What is the purpose of the change

This PR fixes the invalid field mapping and improves the mapping of time 
attributes in general.

## Brief change log

- `SchemaValidator.deriveFieldMapping()` and 
`SchemaValidator.deriveFormatFields()`


## Verifying this change

- Existing tests extended

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5662.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5662


commit 45c1ca57b65004fa973b663e441848ab228ade3e
Author: Timo Walther 
Date:   2018-03-08T10:51:38Z

[FLINK-8854] [table] Fix schema mapping with time attributes




> Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
> -
>
> Key: FLINK-8854
> URL: https://issues.apache.org/jira/browse/FLINK-8854
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not 
> correct.
> It should not only include all fields of the table schema, but also all 
> fields of the format schema (mapped to themselves). Otherwise, it is not 
> possible to use a timestamp extractor on a field that is not in table schema. 
> For example this configuration would fail:
> {code}
> sources:
>   - name: TaxiRides
> schema:
>   - name: rideId
> type: LONG
>   - name: rowTime
> type: TIMESTAMP
> rowtime:
>   timestamps:
> type: "from-field"
> from: "rideTime"
>   watermarks:
> type: "periodic-bounded"
> delay: "6"
> connector:
>   
> format:
>   property-version: 1
>   type: json
>   schema: "ROW(rideId LONG, rideTime TIMESTAMP)"
> {code}
> because {{rideTime}} is not in the table schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)