[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808066#comment-17808066 ] lincoln lee commented on FLINK-33182: - [~twalthr] I've submitted a pr for this, if you could help with the review, that would be great. > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773973#comment-17773973 ] Timo Walther commented on FLINK-33182: -- Thanks [~lincoln.86xy]. Let me know if I can help with a review. > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: lincoln lee >Priority: Major > Fix For: 1.19.0 > > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773700#comment-17773700 ] lincoln lee commented on FLINK-33182: - [~twalthr] Of course, I'll put it on my worklist for the next release. > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773370#comment-17773370 ] Timo Walther commented on FLINK-33182: -- Great that we are on the same page. Yes I meant the `StreamNonDeterministicUpdatePlanVisitor`. Do you want to take this ticket? I guess you are the expert in this class. > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773367#comment-17773367 ] lincoln lee commented on FLINK-33182: - [~twalthr] You're right, I checked the current `DeduplicateFunctionHelper#processLastRowOnChangelog` processing, it does avoid the non-determinism of the -D message, so I agree it's safe in this case with the guarantee that metadata columns are not allowed as primary key. By the way, when you mention NduAnalyzer here, do you actually mean StreamNonDeterministicUpdatePlanVisitor (because NDUAnalyzer relies on it, and we should actually modify the visitor)? > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773229#comment-17773229 ] Timo Walther commented on FLINK-33182: -- [~lincoln.86xy] not sure if I understand your comment correctly. I understand that -D is dangerous and could cause non-determinism. However, ChangelogNormalize does not use the message value of -D. It reconstructs the -D from the previously stored row as can be seen in `DeduplicateFunctionHelper#processLastRowOnChangelog`. So metadata columns cannot affect determinism in this case. > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772592#comment-17772592 ] lincoln lee commented on FLINK-33182: - [~twalthr] I've thought about this carefully, and for upsert sources, while the non-determinism of the metadata column doesn't affect the +U message, if there is a -D message, there is no guarantee of correctness if we relax the strict restriction on the metadata (since the metadata column in the -D message is carried by the message itself, not as the changelognormalize node is handled). So, if we want to relax the restriction, we need to additionally make sure that the source does not contain -D message, WDYT? > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771868#comment-17771868 ] Timo Walther commented on FLINK-33182: -- [~lincoln.86xy] do you agree with this assumption? > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)