[ https://issues.apache.org/jira/browse/FLINK-22730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jingsong Lee closed FLINK-22730. -------------------------------- Fix Version/s: 1.13.2 Resolution: Fixed Fixed via: master: 1635f9326c8993114d8a5aa1881c2078393f8eb5 release-1.13: 229b9c6be3521fea2e5c8fe51a4468918ecf8bb2 > Lookup join condition with CURRENT_DATE fails to filter records > --------------------------------------------------------------- > > Key: FLINK-22730 > URL: https://issues.apache.org/jira/browse/FLINK-22730 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.0, 1.13.0, 1.14.0 > Reporter: Caizhi Weng > Assignee: Caizhi Weng > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.2 > > > Add the following test case to > org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug. > {code:scala} > @Test > def myTest(): Unit = { > val id1 = TestValuesTableFactory.registerData( > Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0)))) > val ddl1 = > s""" > |CREATE TABLE Ta ( > | id VARCHAR, > | ts TIMESTAMP, > | proc AS PROCTIME() > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$id1', > | 'bounded' = 'true' > |) > |""".stripMargin > tEnv.executeSql(ddl1) > val id2 = TestValuesTableFactory.registerData( > Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0)))) > val ddl2 = > s""" > |CREATE TABLE Tb ( > | id VARCHAR, > | ts TIMESTAMP > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$id2', > | 'bounded' = 'true' > |) > |""".stripMargin > tEnv.executeSql(ddl2) > val it = tEnv.executeSql( > """ > |SELECT * FROM Ta AS t1 > |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2 > |ON t1.id = t2.id > |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >= > CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00') > |""".stripMargin).collect() > while (it.hasNext) { > System.out.println(it.next()) > } > } > {code} > The result is > {code} > +I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00] > {code} > which is obviously incorrect. > The generated operator is as follows > {code:java} > public class JoinTableFuncCollector$22 extends > org.apache.flink.table.runtime.collector.TableFunctionCollector { > org.apache.flink.table.data.GenericRowData out = new > org.apache.flink.table.data.GenericRowData(2); > org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new > org.apache.flink.table.data.utils.JoinedRowData(); > private static final java.util.TimeZone timeZone = > java.util.TimeZone.getTimeZone("Asia/Shanghai"); > private org.apache.flink.table.data.TimestampData timestamp; > private org.apache.flink.table.data.TimestampData localTimestamp; > private int date; > private final org.apache.flink.table.data.binary.BinaryStringData str$17 > = org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00"); > public JoinTableFuncCollector$22(Object[] references) throws Exception { > } > @Override > public void open(org.apache.flink.configuration.Configuration parameters) > throws Exception { > } > @Override > public void collect(Object record) throws Exception { > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) getInput(); > org.apache.flink.table.data.RowData in2 = > (org.apache.flink.table.data.RowData) record; > org.apache.flink.table.data.binary.BinaryStringData field$7; > boolean isNull$7; > org.apache.flink.table.data.TimestampData field$8; > boolean isNull$8; > org.apache.flink.table.data.TimestampData field$10; > boolean isNull$10; > boolean isNull$13; > org.apache.flink.table.data.binary.BinaryStringData result$14; > boolean isNull$15; > org.apache.flink.table.data.binary.BinaryStringData result$16; > boolean isNull$18; > org.apache.flink.table.data.binary.BinaryStringData result$19; > boolean isNull$20; > boolean result$21; > isNull$8 = in2.isNullAt(1); > field$8 = null; > if (!isNull$8) { > field$8 = in2.getTimestamp(1, 6); > } > isNull$7 = in2.isNullAt(0); > field$7 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > if (!isNull$7) { > field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) > in2.getString(0)); > } > isNull$10 = in1.isNullAt(1); > field$10 = null; > if (!isNull$10) { > field$10 = in1.getTimestamp(1, 6); > } > boolean result$11 = !isNull$10; > org.apache.flink.table.data.TimestampData result$12 = null; > boolean isNull$12; > if (result$11) { > isNull$12 = isNull$10; > if (!isNull$12) { > result$12 = field$10; > } > } > else { > isNull$12 = isNull$8; > if (!isNull$12) { > result$12 = field$8; > } > } > isNull$13 = isNull$12; > result$14 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > if (!isNull$13) { > result$14 = > org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12, > 6)); > isNull$13 = (result$14 == null); > } > isNull$15 = false; > result$16 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > if (!isNull$15) { > result$16 = > org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int) > date))); > isNull$15 = (result$16 == null); > } > result$19 = > org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) > ? null : (result$16), ( false ) ? null : > (((org.apache.flink.table.data.binary.BinaryStringData) str$17))); > isNull$18 = (result$19 == null); > if (isNull$18) { > result$19 = > org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > } > isNull$20 = isNull$13 || isNull$18; > result$21 = false; > if (!isNull$20) { > result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) > : ((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0; > } > if (result$21) { > if (isNull$7) { > out.setField(0, null); > } else { > out.setField(0, field$7); > } > if (isNull$8) { > out.setField(1, null); > } else { > out.setField(1, field$8); > } > joinedRow$9.replace(in1, out); > joinedRow$9.setRowKind(in1.getRowKind()); > outputResult(joinedRow$9); > } > } > @Override > public void close() throws Exception { > } > } > {code} > The member variable {{date}} is not initialized before use, thus causing this > bug. > This is because > {{LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable}} > forgets to use {{${ctx.reusePerRecordCode()}}} when generating {{collect}} > method. -- This message was sent by Atlassian Jira (v8.3.4#803005)