[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2024-01-18 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808066#comment-17808066
 ] 

lincoln lee commented on FLINK-33182:
-

[~twalthr] I've submitted a pr for this, if you could help with the review, 
that would be great.

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-11 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773973#comment-17773973
 ] 

Timo Walther commented on FLINK-33182:
--

Thanks [~lincoln.86xy]. Let me know if I can help with a review.

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: lincoln lee
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-10 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773700#comment-17773700
 ] 

lincoln lee commented on FLINK-33182:
-

[~twalthr] Of course, I'll put it on my worklist for the next release.

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-09 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773370#comment-17773370
 ] 

Timo Walther commented on FLINK-33182:
--

Great that we are on the same page. Yes I meant the 
`StreamNonDeterministicUpdatePlanVisitor`. Do you want to take this ticket? I 
guess you are the expert in this class.

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-09 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773367#comment-17773367
 ] 

lincoln lee commented on FLINK-33182:
-

[~twalthr] You're right, I checked the current 
`DeduplicateFunctionHelper#processLastRowOnChangelog` processing, it does avoid 
the non-determinism of the -D message, so I agree it's safe in this case with 
the guarantee that metadata columns are not allowed as primary key.
By the way, when you mention NduAnalyzer here, do you actually mean 
StreamNonDeterministicUpdatePlanVisitor (because NDUAnalyzer  relies on it, and 
we should actually modify the visitor)?

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-09 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773229#comment-17773229
 ] 

Timo Walther commented on FLINK-33182:
--

[~lincoln.86xy] not sure if I understand your comment correctly. I understand 
that -D is dangerous and could cause non-determinism. However, 
ChangelogNormalize does not use the message value of -D. It reconstructs the -D 
from the previously stored row as can be seen in 
`DeduplicateFunctionHelper#processLastRowOnChangelog`. So metadata columns 
cannot affect determinism in this case.

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-06 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772592#comment-17772592
 ] 

lincoln lee commented on FLINK-33182:
-

[~twalthr] I've thought about this carefully, and for upsert sources, while the 
non-determinism of the metadata column doesn't affect the +U message, if there 
is a -D message, there is no guarantee of correctness if we relax the strict 
restriction on the metadata (since the metadata column in the -D message is 
carried by the message itself, not as the changelognormalize node is handled). 
So, if we want to relax the restriction, we need to additionally make sure that 
the source does not contain -D message, WDYT?

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-04 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771868#comment-17771868
 ] 

Timo Walther commented on FLINK-33182:
--

[~lincoln.86xy] do you agree with this assumption?

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)