Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-24 Thread via GitHub


lincoln-lil merged PR #24166:
URL: https://github.com/apache/flink/pull/24166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-23 Thread via GitHub


lincoln-lil merged PR #24167:
URL: https://github.com/apache/flink/pull/24167


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-22 Thread via GitHub


flinkbot commented on PR #24167:
URL: https://github.com/apache/flink/pull/24167#issuecomment-1903947584

   
   ## CI report:
   
   * 5667c5b74d23be1d5ed949729a5864f24ff30081 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-22 Thread via GitHub


flinkbot commented on PR #24166:
URL: https://github.com/apache/flink/pull/24166#issuecomment-1903946231

   
   ## CI report:
   
   * 19f63b20dac0bb49738e1a8c78a5935ce8e3d0cb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-22 Thread via GitHub


lincoln-lil opened a new pull request, #24167:
URL: https://github.com/apache/flink/pull/24167

   This is a backport pr of 
[FLINK-34166](https://issues.apache.org/jira/browse/FLINK-34166) into 
release-1.17 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-22 Thread via GitHub


lincoln-lil opened a new pull request, #24166:
URL: https://github.com/apache/flink/pull/24166

   This is a backport pr of FLINK-34166 into release-1.18 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


lincoln-lil merged PR #24152:
URL: https://github.com/apache/flink/pull/24152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


luoyuxia commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461300877


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##
@@ -129,40 +129,55 @@ public void processElement(
 
 // do lookup for acc msg
 if (RowDataUtil.isAccumulateMsg(in)) {
-// clear local state first
-deleteState();
+if (lookupJoinRunner.preFilter(in)) {
+// clear local state first
+deleteState();
 
-// fetcher has copied the input field when object reuse is enabled
-lookupJoinRunner.doFetch(in);
+// fetcher has copied the input field when object reuse is 
enabled
+lookupJoinRunner.doFetch(in);
 
-// update state with empty row if lookup miss or pre-filtered
-if (!collectListener.collected) {
-updateState(emptyRow);
+// update state with empty row if lookup miss or pre-filtered
+if (!collectListener.collected) {
+updateState(emptyRow);
+}
 }
-
 lookupJoinRunner.padNullForLeftJoin(in, out);
 } else {
-// do state access for non-acc msg
-if (lookupKeyContainsPrimaryKey) {
-RowData rightRow = uniqueState.value();
-// should distinguish null from empty(lookup miss)
-if (null == rightRow) {
-stateStaledErrorHandle(in, out);
-} else {
-collectDeleteRow(in, rightRow, out);
-}
-} else {
-List rightRows = state.value();
-if (null == rightRows) {
-stateStaledErrorHandle(in, out);
+boolean collected = false;
+if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


lincoln-lil commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461293471


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##
@@ -129,40 +129,55 @@ public void processElement(
 
 // do lookup for acc msg
 if (RowDataUtil.isAccumulateMsg(in)) {
-// clear local state first
-deleteState();
+if (lookupJoinRunner.preFilter(in)) {
+// clear local state first
+deleteState();
 
-// fetcher has copied the input field when object reuse is enabled
-lookupJoinRunner.doFetch(in);
+// fetcher has copied the input field when object reuse is 
enabled
+lookupJoinRunner.doFetch(in);
 
-// update state with empty row if lookup miss or pre-filtered
-if (!collectListener.collected) {
-updateState(emptyRow);
+// update state with empty row if lookup miss or pre-filtered
+if (!collectListener.collected) {
+updateState(emptyRow);
+}
 }
-
 lookupJoinRunner.padNullForLeftJoin(in, out);
 } else {
-// do state access for non-acc msg
-if (lookupKeyContainsPrimaryKey) {
-RowData rightRow = uniqueState.value();
-// should distinguish null from empty(lookup miss)
-if (null == rightRow) {
-stateStaledErrorHandle(in, out);
-} else {
-collectDeleteRow(in, rightRow, out);
-}
-} else {
-List rightRows = state.value();
-if (null == rightRows) {
-stateStaledErrorHandle(in, out);
+boolean collected = false;
+if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   The preFilter here only related to left local filter conditions which can 
short-cut the lookup path(introduced by FLINK-18445), so the filter on right 
tale `b.name is not null` will not affect the the preFilter logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


luoyuxia commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461275950


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##
@@ -129,40 +129,55 @@ public void processElement(
 
 // do lookup for acc msg
 if (RowDataUtil.isAccumulateMsg(in)) {
-// clear local state first
-deleteState();
+if (lookupJoinRunner.preFilter(in)) {
+// clear local state first
+deleteState();
 
-// fetcher has copied the input field when object reuse is enabled
-lookupJoinRunner.doFetch(in);
+// fetcher has copied the input field when object reuse is 
enabled
+lookupJoinRunner.doFetch(in);
 
-// update state with empty row if lookup miss or pre-filtered
-if (!collectListener.collected) {
-updateState(emptyRow);
+// update state with empty row if lookup miss or pre-filtered
+if (!collectListener.collected) {
+updateState(emptyRow);
+}
 }
-
 lookupJoinRunner.padNullForLeftJoin(in, out);
 } else {
-// do state access for non-acc msg
-if (lookupKeyContainsPrimaryKey) {
-RowData rightRow = uniqueState.value();
-// should distinguish null from empty(lookup miss)
-if (null == rightRow) {
-stateStaledErrorHandle(in, out);
-} else {
-collectDeleteRow(in, rightRow, out);
-}
-} else {
-List rightRows = state.value();
-if (null == rightRows) {
-stateStaledErrorHandle(in, out);
+boolean collected = false;
+if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   If the filter condition is on the right table,will 
`lookupJoinRunner.preFilter(in)` will be true?
   like 
   ```sql
   from a join b as FOR SYSTEM_TIME AS OF PROCTIME() 
   on a.id = b.id and b.name is not null
   ```
   It seems the filter ` b.name is not null` will be code gen into 
`generatedFetcher` instead of `preFilterCondition` in `LookupJoinRunner`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-19 Thread via GitHub


flinkbot commented on PR #24152:
URL: https://github.com/apache/flink/pull/24152#issuecomment-1900715765

   
   ## CI report:
   
   * 0ac79867089732ecba26b32b985519be084e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-19 Thread via GitHub


lincoln-lil opened a new pull request, #24152:
URL: https://github.com/apache/flink/pull/24152

   ## What is the purpose of the change
   KeyedLookupJoinWrapper(when 
'table.optimizer.non-deterministic-update.strategy
   ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) 
incorrectly process delete message for inner join when previous lookup result 
is empty.
   The reason is unaligned processing between retract (which access local 
state) and add (which do fetch via lookup function ) logic.
   
   ## Brief change log
   * sync the retract and add process logic
   * update related tests
   
   ## Verifying this change
   * KeyedLookupJoinHarnessTest
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
@Public(Evolving): (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org