[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source

2024-05-14 Thread JJJJude (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846258#comment-17846258
 ] 

ude commented on FLINK-29625:
-

Hi [~jiabao.sun], I encountered the same problem, see discuss: 
[https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522|http://example.com]

I hope that the optimization of `ChangelogNormalize` can be disabled by option, 
because in the CDC multi-stream join scenario, `UPDATE_BEFORE`(-U) will emit 
-D/+I after passing through the `JOIN` operator and be passed to the 
downstream. `-D` will it to be used as a dimension table. Associated to the 
middle state `NULL`

> Optimize changelog normalize for upsert source
> --
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: jiabao.sun
>Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the 
> source of the upsert changelog mode to complete the _update_before_ value. 
> Even inserting directly from upsert-kafka source to upsert-kafka sink will 
> still add this operator, and there is an extra operator to clear 
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as 
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres 
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process 
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to 
> ask if we can get rid of changelog normalize completely or optimistic about 
> it, adding it only if a normalized changelog is required by a downstream 
> operator.
> If this optimization is worthwhile, I'm happy to help with it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source

2022-10-13 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617422#comment-17617422
 ] 

godfrey he commented on FLINK-29625:


+1 for the improvement

> Optimize changelog normalize for upsert source
> --
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Jiabao Sun
>Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the 
> source of the upsert changelog mode to complete the _update_before_ value. 
> Even inserting directly from upsert-kafka source to upsert-kafka sink will 
> still add this operator, and there is an extra operator to clear 
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as 
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres 
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process 
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to 
> ask if we can get rid of changelog normalize completely or optimistic about 
> it, adding it only if a normalized changelog is required by an after operator.
> If this optimization is worthwhile, I'm happy to help with it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source

2022-10-13 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617416#comment-17617416
 ] 

Shengkai Fang commented on FLINK-29625:
---

[~jiabao.sun]  I think you are right we should let the downstream operator to 
determine whether we need the ChangeLogNormalize. But could you share some 
inputs how do you plan to implement this features?

> Optimize changelog normalize for upsert source
> --
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Jiabao Sun
>Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the 
> source of the upsert changelog mode to complete the _update_before_ value. 
> Even inserting directly from upsert-kafka source to upsert-kafka sink will 
> still add this operator, and there is an extra operator to clear 
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as 
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres 
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process 
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to 
> ask if we can get rid of changelog normalize completely or optimistic about 
> it, adding it only if a normalized changelog is required by an after operator.
> If this optimization is worthwhile, I'm happy to help with it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source

2022-10-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616934#comment-17616934
 ] 

Martijn Visser commented on FLINK-29625:


[~jark] I think you can answer this question better then me

> Optimize changelog normalize for upsert source
> --
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Jiabao Sun
>Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the 
> source of the upsert changelog mode to complete the _update_before_ value. 
> Even inserting directly from upsert-kafka source to upsert-kafka sink will 
> still add this operator, and there is an extra operator to clear 
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as 
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres 
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process 
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to 
> ask if we can get rid of changelog normalize completely or optimistic about 
> it, adding it only if a normalized changelog is required by an after operator.
> If this optimization is worthwhile, I'm happy to help with it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)