[ 
https://issues.apache.org/jira/browse/FLINK-22730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-22730.
--------------------------------
    Fix Version/s: 1.13.2
       Resolution: Fixed

Fixed via:

master: 1635f9326c8993114d8a5aa1881c2078393f8eb5

release-1.13: 229b9c6be3521fea2e5c8fe51a4468918ecf8bb2

> Lookup join condition with CURRENT_DATE fails to filter records
> ---------------------------------------------------------------
>
>                 Key: FLINK-22730
>                 URL: https://issues.apache.org/jira/browse/FLINK-22730
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0, 1.13.0, 1.14.0
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0, 1.13.2
>
>
> Add the following test case to 
> org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   val id1 = TestValuesTableFactory.registerData(
>     Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0))))
>   val ddl1 =
>     s"""
>        |CREATE TABLE Ta (
>        |  id VARCHAR,
>        |  ts TIMESTAMP,
>        |  proc AS PROCTIME()
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$id1',
>        |  'bounded' = 'true'
>        |)
>        |""".stripMargin
>   tEnv.executeSql(ddl1)
>   val id2 = TestValuesTableFactory.registerData(
>     Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0))))
>   val ddl2 =
>     s"""
>        |CREATE TABLE Tb (
>        |  id VARCHAR,
>        |  ts TIMESTAMP
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$id2',
>        |  'bounded' = 'true'
>        |)
>        |""".stripMargin
>   tEnv.executeSql(ddl2)
>   val it = tEnv.executeSql(
>     """
>       |SELECT * FROM Ta AS t1
>       |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2
>       |ON t1.id = t2.id
>       |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >= 
> CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00')
>       |""".stripMargin).collect()
>   while (it.hasNext) {
>     System.out.println(it.next())
>   }
> }
> {code}
> The result is
> {code}
> +I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00]
> {code}
> which is obviously incorrect.
> The generated operator is as follows
> {code:java}
> public class JoinTableFuncCollector$22 extends 
> org.apache.flink.table.runtime.collector.TableFunctionCollector {
>     org.apache.flink.table.data.GenericRowData out = new 
> org.apache.flink.table.data.GenericRowData(2);
>     org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new 
> org.apache.flink.table.data.utils.JoinedRowData();
>     private static final java.util.TimeZone timeZone =
>             java.util.TimeZone.getTimeZone("Asia/Shanghai");
>     private org.apache.flink.table.data.TimestampData timestamp;
>     private org.apache.flink.table.data.TimestampData localTimestamp;
>     private int date;
>     private final org.apache.flink.table.data.binary.BinaryStringData str$17 
> = org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00");
>     public JoinTableFuncCollector$22(Object[] references) throws Exception {
>     }
>     @Override
>     public void open(org.apache.flink.configuration.Configuration parameters) 
> throws Exception {
>     }
>     @Override
>     public void collect(Object record) throws Exception {
>         org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) getInput();
>         org.apache.flink.table.data.RowData in2 = 
> (org.apache.flink.table.data.RowData) record;
>         org.apache.flink.table.data.binary.BinaryStringData field$7;
>         boolean isNull$7;
>         org.apache.flink.table.data.TimestampData field$8;
>         boolean isNull$8;
>         org.apache.flink.table.data.TimestampData field$10;
>         boolean isNull$10;
>         boolean isNull$13;
>         org.apache.flink.table.data.binary.BinaryStringData result$14;
>         boolean isNull$15;
>         org.apache.flink.table.data.binary.BinaryStringData result$16;
>         boolean isNull$18;
>         org.apache.flink.table.data.binary.BinaryStringData result$19;
>         boolean isNull$20;
>         boolean result$21;
>         isNull$8 = in2.isNullAt(1);
>         field$8 = null;
>         if (!isNull$8) {
>             field$8 = in2.getTimestamp(1, 6);
>         }
>         isNull$7 = in2.isNullAt(0);
>         field$7 = 
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>         if (!isNull$7) {
>             field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) 
> in2.getString(0));
>         }
>         isNull$10 = in1.isNullAt(1);
>         field$10 = null;
>         if (!isNull$10) {
>             field$10 = in1.getTimestamp(1, 6);
>         }
>         boolean result$11 = !isNull$10;
>         org.apache.flink.table.data.TimestampData result$12 = null;
>         boolean isNull$12;
>         if (result$11) {
>             isNull$12 = isNull$10;
>             if (!isNull$12) {
>                 result$12 = field$10;
>             }
>         }
>         else {
>             isNull$12 = isNull$8;
>             if (!isNull$12) {
>                 result$12 = field$8;
>             }
>         }
>         isNull$13 = isNull$12;
>         result$14 = 
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>         if (!isNull$13) {
>             result$14 = 
> org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12,
>  6));
>             isNull$13 = (result$14 == null);
>         }
>         isNull$15 = false;
>         result$16 = 
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>         if (!isNull$15) {
>             result$16 = 
> org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int)
>  date)));
>             isNull$15 = (result$16 == null);
>         }
>         result$19 = 
> org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) 
> ? null : (result$16), ( false ) ? null : 
> (((org.apache.flink.table.data.binary.BinaryStringData) str$17)));
>         isNull$18 = (result$19 == null);
>         if (isNull$18) {
>             result$19 = 
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>         }
>         isNull$20 = isNull$13 || isNull$18;
>         result$21 = false;
>         if (!isNull$20) {
>             result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) 
> : ((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0;
>         }
>         if (result$21) {
>             if (isNull$7) {
>                 out.setField(0, null);
>             } else {
>                 out.setField(0, field$7);
>             }
>             if (isNull$8) {
>                 out.setField(1, null);
>             } else {
>                 out.setField(1, field$8);
>             }
>             joinedRow$9.replace(in1, out);
>             joinedRow$9.setRowKind(in1.getRowKind());
>             outputResult(joinedRow$9);
>         }
>     }
>     @Override
>     public void close() throws Exception {
>     }
> }
> {code}
> The member variable {{date}} is not initialized before use, thus causing this 
> bug.
> This is because 
> {{LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable}} 
> forgets to use {{${ctx.reusePerRecordCode()}}} when generating {{collect}} 
> method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to