[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source
[ https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846258#comment-17846258 ] ude commented on FLINK-29625: - Hi [~jiabao.sun], I encountered the same problem, see discuss: [https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522|http://example.com] I hope that the optimization of `ChangelogNormalize` can be disabled by option, because in the CDC multi-stream join scenario, `UPDATE_BEFORE`(-U) will emit -D/+I after passing through the `JOIN` operator and be passed to the downstream. `-D` will it to be used as a dimension table. Associated to the middle state `NULL` > Optimize changelog normalize for upsert source > -- > > Key: FLINK-29625 > URL: https://issues.apache.org/jira/browse/FLINK-29625 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: jiabao.sun >Priority: Major > > Currently, Flink will add an expensive operator _changelog normalize_ to the > source of the upsert changelog mode to complete the _update_before_ value. > Even inserting directly from upsert-kafka source to upsert-kafka sink will > still add this operator, and there is an extra operator to clear > _upsert_before_ messages, which is obviously redundant. > In CDC scenarios, some databases do not provide update before images, such as > Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres > ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process > these changelog will have a lot of state overhead. > I don't know much about why this operator is needed, so I take the liberty to > ask if we can get rid of changelog normalize completely or optimistic about > it, adding it only if a normalized changelog is required by a downstream > operator. > If this optimization is worthwhile, I'm happy to help with it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source
[ https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617422#comment-17617422 ] godfrey he commented on FLINK-29625: +1 for the improvement > Optimize changelog normalize for upsert source > -- > > Key: FLINK-29625 > URL: https://issues.apache.org/jira/browse/FLINK-29625 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Jiabao Sun >Priority: Major > > Currently, Flink will add an expensive operator _changelog normalize_ to the > source of the upsert changelog mode to complete the _update_before_ value. > Even inserting directly from upsert-kafka source to upsert-kafka sink will > still add this operator, and there is an extra operator to clear > _upsert_before_ messages, which is obviously redundant. > In CDC scenarios, some databases do not provide update before images, such as > Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres > ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process > these changelog will have a lot of state overhead. > I don't know much about why this operator is needed, so I take the liberty to > ask if we can get rid of changelog normalize completely or optimistic about > it, adding it only if a normalized changelog is required by an after operator. > If this optimization is worthwhile, I'm happy to help with it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source
[ https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617416#comment-17617416 ] Shengkai Fang commented on FLINK-29625: --- [~jiabao.sun] I think you are right we should let the downstream operator to determine whether we need the ChangeLogNormalize. But could you share some inputs how do you plan to implement this features? > Optimize changelog normalize for upsert source > -- > > Key: FLINK-29625 > URL: https://issues.apache.org/jira/browse/FLINK-29625 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Jiabao Sun >Priority: Major > > Currently, Flink will add an expensive operator _changelog normalize_ to the > source of the upsert changelog mode to complete the _update_before_ value. > Even inserting directly from upsert-kafka source to upsert-kafka sink will > still add this operator, and there is an extra operator to clear > _upsert_before_ messages, which is obviously redundant. > In CDC scenarios, some databases do not provide update before images, such as > Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres > ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process > these changelog will have a lot of state overhead. > I don't know much about why this operator is needed, so I take the liberty to > ask if we can get rid of changelog normalize completely or optimistic about > it, adding it only if a normalized changelog is required by an after operator. > If this optimization is worthwhile, I'm happy to help with it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source
[ https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616934#comment-17616934 ] Martijn Visser commented on FLINK-29625: [~jark] I think you can answer this question better then me > Optimize changelog normalize for upsert source > -- > > Key: FLINK-29625 > URL: https://issues.apache.org/jira/browse/FLINK-29625 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Jiabao Sun >Priority: Major > > Currently, Flink will add an expensive operator _changelog normalize_ to the > source of the upsert changelog mode to complete the _update_before_ value. > Even inserting directly from upsert-kafka source to upsert-kafka sink will > still add this operator, and there is an extra operator to clear > _upsert_before_ messages, which is obviously redundant. > In CDC scenarios, some databases do not provide update before images, such as > Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres > ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process > these changelog will have a lot of state overhead. > I don't know much about why this operator is needed, so I take the liberty to > ask if we can get rid of changelog normalize completely or optimistic about > it, adding it only if a normalized changelog is required by an after operator. > If this optimization is worthwhile, I'm happy to help with it. -- This message was sent by Atlassian Jira (v8.20.10#820010)