[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-09 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17455065#comment-17455065
 ] 

Jingsong Lee edited comment on FLINK-20370 at 12/10/21, 2:11 AM:
-

Fixed via:

 

part1: Fix wrong results when sink primary key is not the same with query 
result's changelog upsert key

master: f8f6935adc841529ecdc0636174650cffbf73719

release-1.14: 7c5ddbd201005e55ab68b4db7ee74c7cbeb13400

release-1.13: 54aaffc7d9b2b06726294cf636d1b9acf74c5d49

 

part2: introduce 'table.exec.sink.keyed-shuffle' option to auto keyby on sink's 
pk if parallelism are not the same for insertOnly input

master: 9e76585f1fa110288604913a73d86ac2f1542777

 

part2 does not cherry-pick to 1.14 because it may affect the normal plan and 
lead to incompatibility.


was (Author: lzljs3620320):
Fixed via:

 

part1: Fix wrong results when sink primary key is not the same with query 
result's changelog upsert key

master: f8f6935adc841529ecdc0636174650cffbf73719

release-1.14: 7c5ddbd201005e55ab68b4db7ee74c7cbeb13400

 

part2: introduce 'table.exec.sink.keyed-shuffle' option to auto keyby on sink's 
pk if parallelism are not the same for insertOnly input

master: 9e76585f1fa110288604913a73d86ac2f1542777

 

part2 does not cherry-pick to 1.14 because it may affect the normal plan and 
lead to incompatibility.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-24 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441679#comment-17441679
 ] 

lincoln lee edited comment on FLINK-20370 at 11/25/21, 6:08 AM:


Thanks for all your inputs!
Based on Jingson's summary and the discussion, let me try to summarize, please 
correct me if I'm wrong

An upsert sink can accept such inputs:
1. input is changelog stream 
1.1 primary key = upsert key, it's already ok 
1.2 primary key != upsert key (upsert key can be none), we should add 
upsertMaterialize
1.3 primary key contains upsert key, upsertMaterialize can be ommitted but sink 
requires update_before

2. input is append stream
2.1 sink has same parallelism with the upstream operator, it's ok
2.2 sink's parallelism differs from the upstream operator, we should add a 
'keyby' for the primary key by default

The current pr already addressed 1.2 & 1.3, so remaining the 2.2 to be done. 
The fix is simple, but for the sake of be configurable, 
we should introduce another job level config option similar to 
'table.exec.sink.upsert-materialize' (since FLINK-24254 fine-grained setting 
per INSERT INTO not ready now) 
I temporally name it 'table.exec.sink.pk-shuffle',  and updated the pr, welcome 
your suggestions.

cc [~twalthr] [~lzljs3620320] [~wenlong.lwl] 

 

 


was (Author: lincoln.86xy):
Thanks for all your inputs!
Based on Jingson's summary and the discussion, let me try to summarize, please 
correct me if I'm wrong

An upsert sink can accept such inputs:
1. input is changelog stream 
1.1 primary key = upsert key, it's already ok 
1.2 primary key != upsert key (upsert key can be none), we should add 
upsertMaterialize
1.3 primary key contains upsert key, upsertMaterialize can be committed but 
sink requires update_before

2. input is append stream
2.1 sink has same parallelism with the upstream operator, it's ok
2.2 sink's parallelism differs from the upstream operator, we should add a 
'keyby' for the primary key by default

The current pr already addressed 1.2 & 1.3, so remaining the 2.2 to be done. 
The fix is simple, but for the sake of be configurable, 
we should introduce another job level config option similar to 
'table.exec.sink.upsert-materialize' (since FLINK-24254 fine-grained setting 
per INSERT INTO not ready now) 
I temporally name it 'table.exec.sink.pk-shuffle',  and updated the pr, welcome 
your suggestions.

cc [~twalthr] [~lzljs3620320] [~wenlong.lwl] 

 

 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- 

[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-09 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412497#comment-17412497
 ] 

Timo Walther edited comment on FLINK-20370 at 9/9/21, 10:36 AM:


Even in case 2, if users declare a PRIMARY KEY for Kafka the same keys should 
end up in the same partition. Distributed disorder should only be allowed if 
the sink does not declare a PRIMARY KEY. We should aim for correctness and add 
a keyBy for case 2 and enable UpsertMaterialize for case 3. Of course, this can 
be disabled if necessary.

I mentioned it in a different issue already, but we should also introduce hints 
that allow those important settings for sources and sinks fine-grained per 
INSERT INTO. I will open an issue for that if it doesn't exist yet.


was (Author: twalthr):
Even in case 2, if users declare a PRIMARY KEY for Kafka the same keys should 
end up in the same partition. Distributed disorder should only be allowed if 
the sink does not declare a PRIMARY KEY. We should aim for correctness and add 
a keyBy for case 1 and enable UpsertMaterialize for case 2. Of course, this can 
be disabled if necessary.

I mentioned it in a different issue already, but we should also introduce hints 
that allow those important settings for sources and sinks fine-grained per 
INSERT INTO. I will open an issue for that if it doesn't exist yet.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-09 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412497#comment-17412497
 ] 

Timo Walther edited comment on FLINK-20370 at 9/9/21, 10:31 AM:


Even in case 2, if users declare a PRIMARY KEY for Kafka the same keys should 
end up in the same partition. Distributed disorder should only be allowed if 
the sink does not declare a PRIMARY KEY. We should aim for correctness and add 
a keyBy for case 1 and enable UpsertMaterialize for case 2. Of course, this can 
be disabled if necessary.

I mentioned it in a different issue already, but we should also introduce hints 
that allow those important settings for sources and sinks fine-grained per 
INSERT INTO. I will open an issue for that if it doesn't exist yet.


was (Author: twalthr):
Even in case 2, if users declare a PRIMARY KEY for Kafka the same keys should 
end up in the same partition. Distributed disorder should only be allowed if 
the sink does not declare a PRIMARY KEY. We should aim for correctness and add 
a keyBy for case 1 and enable UpsertMaterialize for case 2. Of course, this can 
be disabled if necessary.

I mentioned it in a different issue already, but we should also introduce hints 
that allow those important settings for sources fine-grained per INSERT INTO. I 
will open an issue for that if it doesn't exist yet.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412302#comment-17412302
 ] 

Jingsong Lee edited comment on FLINK-20370 at 9/9/21, 3:54 AM:
---

Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK, nothing needs to do.
# input is append only, sometimes is OK, We can only assume that users can 
allow some degree of distributed disorder.
# input is change log, primary key != unique key (unique key can be none), the 
most problematic situation


was (Author: lzljs3620320):
Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK, nothing needs to do.
# input is append only, sometimes is OK, We can only assume that users can 
allow some degree of distributed disorder.
# input is change log, primary key != unique key (unique key can be null), the 
most problematic situation

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412302#comment-17412302
 ] 

Jingsong Lee edited comment on FLINK-20370 at 9/9/21, 3:51 AM:
---

Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK, nothing needs to do.
# input is append only, sometimes is OK, We can only assume that users can 
allow some degree of distributed disorder.
# input is change log, primary key != unique key (unique key can be null), the 
most problematic situation


was (Author: lzljs3620320):
Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK, nothing needs to do.
# input is append only, sometimes is OK, We can only assume that users can 
allow some degree of distributed disorder.
# primary key != unique key (unique key can be null), the most problematic 
situation

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412302#comment-17412302
 ] 

Jingsong Lee edited comment on FLINK-20370 at 9/9/21, 3:50 AM:
---

Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK, nothing needs to do.
# input is append only, sometimes is OK, We can only assume that users can 
allow some degree of distributed disorder.
# primary key != unique key (unique key can be null), the most problematic 
situation


was (Author: lzljs3620320):
Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK
# input is append only, sometimes is OK
# primary key != unique key, the most problematic situation

Maybe the third can be disabled. But maybe it will make many situations 
difficult to use. We need a flag...


> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2020-11-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239147#comment-17239147
 ] 

Jark Wu edited comment on FLINK-20370 at 11/26/20, 9:32 AM:


[~fsk119], no, this will not be considered, because it has the same semantic 
with batch query. 
You can run your above code in MySQL, and you will get the same result. 
However, running the example of JIRA description in MySQL, will get a different 
result. 


was (Author: jark):
[~fsk119], no, this will not be considered, because it has the same semantic of 
batch query. 
You can run your above code in MySQL, and you will get the same result. 
However, running the example of JIRA description in MySQL, will get a different 
result. 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)