This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e4a2f3fe6c558a08ec68dcb8e21ba43e85f3cf1
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Mon Nov 27 10:47:56 2023 -0800

    [FLINK-33647] Implement restore tests for LookupJoin node
---
 .../nodes/exec/stream/LookupJoinRestoreTest.java   |  46 +++
 .../nodes/exec/stream/LookupJoinTestPrograms.java  | 388 +++++++++++++++++++++
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   3 -
 .../plan/lookup-join-async-hint.json               | 251 +++++++++++++
 .../lookup-join-async-hint/savepoint/_metadata     | Bin 0 -> 15900 bytes
 .../plan/lookup-join-filter-pushdown.json          | 264 ++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 13735 bytes
 .../plan/lookup-join-left-join.json                | 279 +++++++++++++++
 .../lookup-join-left-join/savepoint/_metadata      | Bin 0 -> 14069 bytes
 .../plan/lookup-join-post-filter.json              | 266 ++++++++++++++
 .../lookup-join-post-filter/savepoint/_metadata    | Bin 0 -> 14021 bytes
 .../plan/lookup-join-pre-filter.json               | 279 +++++++++++++++
 .../lookup-join-pre-filter/savepoint/_metadata     | Bin 0 -> 14005 bytes
 .../plan/lookup-join-pre-post-filter.json          | 281 +++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 13957 bytes
 .../plan/lookup-join-project-pushdown.json         | 248 +++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 13139 bytes
 .../plan/lookup-join-retry-hint.json               | 252 +++++++++++++
 .../lookup-join-retry-hint/savepoint/_metadata     | Bin 0 -> 14135 bytes
 19 files changed, 2554 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
new file mode 100644
index 00000000000..42ca645162b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecLookupJoin}. */
+public class LookupJoinRestoreTest extends RestoreTestBase {
+
+    public LookupJoinRestoreTest() {
+        super(StreamExecLookupJoin.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                LookupJoinTestPrograms.LOOKUP_JOIN_PROJECT_PUSHDOWN,
+                LookupJoinTestPrograms.LOOKUP_JOIN_FILTER_PUSHDOWN,
+                LookupJoinTestPrograms.LOOKUP_JOIN_LEFT_JOIN,
+                LookupJoinTestPrograms.LOOKUP_JOIN_PRE_FILTER,
+                LookupJoinTestPrograms.LOOKUP_JOIN_POST_FILTER,
+                LookupJoinTestPrograms.LOOKUP_JOIN_PRE_POST_FILTER,
+                LookupJoinTestPrograms.LOOKUP_JOIN_ASYNC_HINT,
+                LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
new file mode 100644
index 00000000000..9b5c18b3f98
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecLookupJoin}. */
+public class LookupJoinTestPrograms {
+
+    static final String[] CUSTOMERS_SCHEMA =
+            new String[] {
+                "id INT PRIMARY KEY NOT ENFORCED",
+                "name STRING",
+                "age INT",
+                "city STRING",
+                "state STRING",
+                "zipcode INT"
+            };
+
+    static final Row[] CUSTOMERS_BEFORE_DATA =
+            new Row[] {
+                Row.of(1, "Bob", 28, "Mountain View", "California", 94043),
+                Row.of(2, "Alice", 32, "San Francisco", "California", 95016),
+                Row.of(3, "Claire", 37, "Austin", "Texas", 73301),
+                Row.of(4, "Shannon", 29, "Boise", "Idaho", 83701),
+                Row.of(5, "Jake", 42, "New York City", "New York", 10001)
+            };
+
+    static final Row[] CUSTOMERS_AFTER_DATA =
+            new Row[] {
+                Row.of(1, "Bob", 28, "San Jose", "California", 94089),
+                Row.of(6, "Joana", 54, "Atlanta", "Georgia", 30033)
+            };
+
+    static final SourceTestStep CUSTOMERS =
+            SourceTestStep.newBuilder("customers_t")
+                    .addOption("disable-lookup", "false") // static/lookup 
table
+                    .addOption("filterable-fields", "age")
+                    .addSchema(CUSTOMERS_SCHEMA)
+                    // Note: The before/after data is used to initialize the 
lookup table
+                    // The data is not consumed like regular tables since 
lookup tables are
+                    // external tables with data already present in them.
+                    // Therefore, no state is persisted for lookup tables
+                    .producedBeforeRestore(CUSTOMERS_BEFORE_DATA)
+                    .producedAfterRestore(CUSTOMERS_AFTER_DATA)
+                    .build();
+
+    static final SourceTestStep CUSTOMERS_ASYNC =
+            SourceTestStep.newBuilder("customers_t")
+                    .addOption("disable-lookup", "false") // static/lookup 
table
+                    .addOption("filterable-fields", "age")
+                    .addOption("async", "true")
+                    .addSchema(CUSTOMERS_SCHEMA)
+                    // Note: The before/after data is used to initialize the 
lookup table
+                    // The data is not consumed like regular tables since 
lookup tables are
+                    // external tables with data already present in them.
+                    // Therefore, no state is persisted for lookup tables
+                    .producedBeforeRestore(CUSTOMERS_BEFORE_DATA)
+                    .producedAfterRestore(CUSTOMERS_AFTER_DATA)
+                    .build();
+
+    static final SourceTestStep ORDERS =
+            SourceTestStep.newBuilder("orders_t")
+                    .addOption("filterable-fields", "customer_id")
+                    .addSchema(
+                            "order_id INT",
+                            "customer_id INT",
+                            "total DOUBLE",
+                            "order_time STRING",
+                            "proc_time AS PROCTIME()")
+                    .producedBeforeRestore(
+                            Row.of(1, 3, 44.44, "2020-10-10 00:00:01"),
+                            Row.of(2, 5, 100.02, "2020-10-10 00:00:02"),
+                            Row.of(4, 2, 92.61, "2020-10-10 00:00:04"),
+                            Row.of(3, 1, 23.89, "2020-10-10 00:00:03"),
+                            Row.of(6, 4, 7.65, "2020-10-10 00:00:06"),
+                            Row.of(5, 2, 12.78, "2020-10-10 00:00:05"))
+                    .producedAfterRestore(
+                            Row.of(7, 6, 17.58, "2020-10-10 00:00:07"), // new 
customer
+                            Row.of(9, 1, 143.21, "2020-10-10 00:00:08") // 
updated zip code
+                            )
+                    .build();
+
+    static final List<String> SINK_SCHEMA =
+            Arrays.asList(
+                    "order_id INT",
+                    "total DOUBLE",
+                    "id INT",
+                    "name STRING",
+                    "age INT",
+                    "city STRING",
+                    "state STRING",
+                    "zipcode INT");
+
+    static final TableTestProgram LOOKUP_JOIN_PROJECT_PUSHDOWN =
+            TableTestProgram.of(
+                            "lookup-join-project-pushdown",
+                            "validates lookup join with project pushdown")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            SINK_SCHEMA.stream()
+                                                    .filter(field -> 
!field.equals("age INT"))
+                                                    
.collect(Collectors.toList()))
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, 3, Claire, Austin, 
Texas, 73301]",
+                                            "+I[2, 100.02, 5, Jake, New York 
City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, San 
Francisco, California, 95016]",
+                                            "+I[3, 23.89, 1, Bob, Mountain 
View, California, 94043]",
+                                            "+I[6, 7.65, 4, Shannon, Boise, 
Idaho, 83701]",
+                                            "+I[5, 12.78, 2, Alice, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, Atlanta, 
Georgia, 30033]",
+                                            "+I[9, 143.21, 1, Bob, San Jose, 
California, 94089]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "JOIN customers_t FOR SYSTEM_TIME AS OF 
O.proc_time AS C "
+                                    + "ON O.customer_id = C.id")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_FILTER_PUSHDOWN =
+            TableTestProgram.of(
+                            "lookup-join-filter-pushdown",
+                            "validates lookup join with filter pushdown")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, 3, Claire, 37, 
Austin, Texas, 73301]",
+                                            "+I[2, 100.02, 5, Jake, 42, New 
York City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, 32, San 
Francisco, California, 95016]",
+                                            "+I[5, 12.78, 2, Alice, 32, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "JOIN customers_t FOR SYSTEM_TIME AS OF 
O.proc_time AS C "
+                                    + "ON O.customer_id = C.id AND C.age > 30")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_PRE_FILTER =
+            TableTestProgram.of("lookup-join-pre-filter", "validates lookup 
join with pre filter")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, 3, Claire, 37, 
Austin, Texas, 73301]",
+                                            "+I[2, 100.02, 5, Jake, 42, New 
York City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, 32, San 
Francisco, California, 95016]",
+                                            "+I[3, 23.89, null, null, null, 
null, null, null]",
+                                            "+I[6, 7.65, null, null, null, 
null, null, null]",
+                                            "+I[5, 12.78, null, null, null, 
null, null, null]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]",
+                                            "+I[9, 143.21, null, null, null, 
null, null, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "LEFT JOIN customers_t FOR SYSTEM_TIME 
AS OF O.proc_time AS C "
+                                    + "ON O.customer_id = C.id AND C.age > 30 
AND O.total > 15.3")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_POST_FILTER =
+            TableTestProgram.of("lookup-join-post-filter", "validates lookup 
join with post filter")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, null, null, null, 
null, null, null]",
+                                            "+I[2, 100.02, null, null, null, 
null, null, null]",
+                                            "+I[4, 92.61, null, null, null, 
null, null, null]",
+                                            "+I[3, 23.89, 1, Bob, 28, Mountain 
View, California, 94043]",
+                                            "+I[6, 7.65, 4, Shannon, 29, 
Boise, Idaho, 83701]",
+                                            "+I[5, 12.78, 2, Alice, 32, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]",
+                                            "+I[9, 143.21, null, null, null, 
null, null, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "LEFT JOIN customers_t FOR SYSTEM_TIME 
AS OF O.proc_time AS C "
+                                    + "ON O.customer_id = C.id AND 
CAST(O.total AS INT) < C.age")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_PRE_POST_FILTER =
+            TableTestProgram.of(
+                            "lookup-join-pre-post-filter",
+                            "validates lookup join with pre and post filters")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, null, null, null, 
null, null, null]",
+                                            "+I[2, 100.02, null, null, null, 
null, null, null]",
+                                            "+I[4, 92.61, null, null, null, 
null, null, null]",
+                                            "+I[3, 23.89, 1, Bob, 28, Mountain 
View, California, 94043]",
+                                            "+I[6, 7.65, null, null, null, 
null, null, null]",
+                                            "+I[5, 12.78, null, null, null, 
null, null, null]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]",
+                                            "+I[9, 143.21, null, null, null, 
null, null, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "LEFT JOIN customers_t FOR SYSTEM_TIME 
AS OF O.proc_time AS C "
+                                    + "ON O.customer_id = C.id AND O.total > 
15.3 AND CAST(O.total AS INT) < C.age")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_LEFT_JOIN =
+            TableTestProgram.of("lookup-join-left-join", "validates lookup 
join with left join")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, null, null, null, 
null, null, null]",
+                                            "+I[2, 100.02, 5, Jake, 42, New 
York City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, 32, San 
Francisco, California, 95016]",
+                                            "+I[3, 23.89, null, null, null, 
null, null, null]",
+                                            "+I[6, 7.65, null, null, null, 
null, null, null]",
+                                            "+I[5, 12.78, 2, Alice, 32, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]",
+                                            "+I[9, 143.21, null, null, null, 
null, null, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "LEFT JOIN customers_t FOR SYSTEM_TIME 
AS OF O.proc_time AS C "
+                                    + "ON O.customer_id = C.id AND C.age > 30 
AND O.customer_id <> 3")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_ASYNC_HINT =
+            TableTestProgram.of("lookup-join-async-hint", "validates lookup 
join with async hint")
+                    .setupTableSource(CUSTOMERS_ASYNC)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, 3, Claire, 37, 
Austin, Texas, 73301]",
+                                            "+I[2, 100.02, 5, Jake, 42, New 
York City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, 32, San 
Francisco, California, 95016]",
+                                            "+I[3, 23.89, 1, Bob, 28, Mountain 
View, California, 94043]",
+                                            "+I[6, 7.65, 4, Shannon, 29, 
Boise, Idaho, 83701]",
+                                            "+I[5, 12.78, 2, Alice, 32, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]",
+                                            "+I[9, 143.21, 1, Bob, 28, San 
Jose, California, 94089]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "/*+ LOOKUP('table'='C', 'async'='true', 
'output-mode'='allow_unordered', 'timeout'='500s', 'capacity'='2000') */ "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "JOIN customers_t FOR SYSTEM_TIME AS OF 
O.proc_time AS C "
+                                    + "ON O.customer_id = C.id")
+                    .build();
+
+    static final TableTestProgram LOOKUP_JOIN_RETRY_HINT =
+            TableTestProgram.of("lookup-join-retry-hint", "validates lookup 
join with retry hint")
+                    .setupTableSource(CUSTOMERS)
+                    .setupTableSource(ORDERS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[1, 44.44, 3, Claire, 37, 
Austin, Texas, 73301]",
+                                            "+I[2, 100.02, 5, Jake, 42, New 
York City, New York, 10001]",
+                                            "+I[4, 92.61, 2, Alice, 32, San 
Francisco, California, 95016]",
+                                            "+I[3, 23.89, 1, Bob, 28, Mountain 
View, California, 94043]",
+                                            "+I[6, 7.65, 4, Shannon, 29, 
Boise, Idaho, 83701]",
+                                            "+I[5, 12.78, 2, Alice, 32, San 
Francisco, California, 95016]")
+                                    .consumedAfterRestore(
+                                            "+I[7, 17.58, 6, Joana, 54, 
Atlanta, Georgia, 30033]",
+                                            "+I[9, 143.21, 1, Bob, 28, San 
Jose, California, 94089]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "/*+ LOOKUP('table'='C', 
'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 
'fixed-delay'='10s', 'max-attempts'='3') */ "
+                                    + "O.order_id, "
+                                    + "O.total, "
+                                    + "C.id, "
+                                    + "C.name, "
+                                    + "C.age, "
+                                    + "C.city, "
+                                    + "C.state, "
+                                    + "C.zipcode "
+                                    + "FROM orders_t as O "
+                                    + "JOIN customers_t FOR SYSTEM_TIME AS OF 
O.proc_time AS C "
+                                    + "ON O.customer_id = C.id")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 7c07bf9bf51..8dd4646df10 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -193,7 +193,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             options.put("connector", "values");
             options.put("data-id", id);
             options.put("terminating", "false");
-            options.put("disable-lookup", "true");
             options.put("runtime-source", "NewSource");
             sourceTestStep.apply(tEnv, options);
         }
@@ -203,7 +202,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             registerSinkObserver(futures, sinkTestStep, true);
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
-            options.put("disable-lookup", "true");
             options.put("sink-insert-only", "false");
             sinkTestStep.apply(tEnv, options);
         }
@@ -252,7 +250,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("data-id", id);
-            options.put("disable-lookup", "true");
             options.put("runtime-source", "NewSource");
             if (afterRestoreSource == AfterRestoreSource.INFINITE) {
                 options.put("terminating", "false");
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/plan/lookup-join-async-hint.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/plan/lookup-join-async-hint.json
new file mode 100644
index 00000000000..8538f0cffe1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/plan/lookup-join-async-hint.json
@@ -0,0 +1,251 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 25,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 26,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "INNER",
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        }
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "asyncOptions" : {
+      "capacity " : 2000,
+      "timeout" : 500000,
+      "output-mode" : "UNORDERED"
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` 
VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, 
total, id, name, age, city, state, zipcode], async=[UNORDERED, 500000ms, 2000])"
+  }, {
+    "id" : 27,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 28,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 25,
+    "target" : 26,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 26,
+    "target" : 27,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 27,
+    "target" : 28,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/savepoint/_metadata
new file mode 100644
index 00000000000..f811fb0517d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/plan/lookup-join-filter-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/plan/lookup-join-filter-pushdown.json
new file mode 100644
index 00000000000..325f14c90a3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/plan/lookup-join-filter-pushdown.json
@@ -0,0 +1,264 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 5,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "INNER",
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        },
+        "abilities" : [ {
+          "type" : "FilterPushDown",
+          "predicates" : [ {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$>$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 2,
+              "type" : "INT"
+            }, {
+              "kind" : "LITERAL",
+              "value" : 30,
+              "type" : "INT NOT NULL"
+            } ],
+            "type" : "BOOLEAN"
+          } ]
+        } ]
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` 
VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, 
total, id, name, age, city, state, zipcode])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/savepoint/_metadata
new file mode 100644
index 00000000000..fa7a32bb113
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/plan/lookup-join-left-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/plan/lookup-join-left-join.json
new file mode 100644
index 00000000000..80faae47702
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/plan/lookup-join-left-join.json
@@ -0,0 +1,279 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 9,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "LEFT",
+    "preFilterCondition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$<>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        },
+        "abilities" : [ {
+          "type" : "FilterPushDown",
+          "predicates" : [ {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$>$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 2,
+              "type" : "INT"
+            }, {
+              "kind" : "LITERAL",
+              "value" : 30,
+              "type" : "INT NOT NULL"
+            } ],
+            "type" : "BOOLEAN"
+          } ]
+        } ]
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), 
`state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(customer_id 
<> 3)], select=[order_id, customer_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/savepoint/_metadata
new file mode 100644
index 00000000000..ac7367cc552
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/plan/lookup-join-post-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/plan/lookup-join-post-filter.json
new file mode 100644
index 00000000000..fd2f1825e57
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/plan/lookup-join-post-filter.json
@@ -0,0 +1,266 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 17,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "LEFT",
+    "joinCondition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$<$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CAST$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "DOUBLE"
+        } ],
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 5,
+        "type" : "INT"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        }
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), 
`state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(CAST(total 
AS INTEGER) < age)], select=[order_id, customer_id, total, id, name, age, city, 
state, zipcode])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/savepoint/_metadata
new file mode 100644
index 00000000000..4aea6a1a2d9
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/plan/lookup-join-pre-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/plan/lookup-join-pre-filter.json
new file mode 100644
index 00000000000..a7c551450f3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/plan/lookup-join-pre-filter.json
@@ -0,0 +1,279 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 13,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "LEFT",
+    "preFilterCondition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "DOUBLE"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "15.3",
+        "type" : "DECIMAL(3, 1) NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        },
+        "abilities" : [ {
+          "type" : "FilterPushDown",
+          "predicates" : [ {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$>$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 2,
+              "type" : "INT"
+            }, {
+              "kind" : "LITERAL",
+              "value" : 30,
+              "type" : "INT NOT NULL"
+            } ],
+            "type" : "BOOLEAN"
+          } ]
+        } ]
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), 
`state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(total > 
15.3)], select=[order_id, customer_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/savepoint/_metadata
new file mode 100644
index 00000000000..14c370e5f13
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/plan/lookup-join-pre-post-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/plan/lookup-join-pre-post-filter.json
new file mode 100644
index 00000000000..df07598a99c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/plan/lookup-join-pre-post-filter.json
@@ -0,0 +1,281 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 21,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "LEFT",
+    "preFilterCondition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "DOUBLE"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "15.3",
+        "type" : "DECIMAL(3, 1) NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "joinCondition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$<$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CAST$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "DOUBLE"
+        } ],
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 5,
+        "type" : "INT"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        }
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), 
`state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(total > 
15.3)(CAST(total AS INTEGER) < age)], select=[order_id, customer_id, total, id, 
name, age, city, state, zipcode])"
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 24,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` 
VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 23,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/savepoint/_metadata
new file mode 100644
index 00000000000..e755cc09068
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/plan/lookup-join-project-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/plan/lookup-join-project-pushdown.json
new file mode 100644
index 00000000000..91502b1858a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/plan/lookup-join-project-pushdown.json
@@ -0,0 +1,248 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "INNER",
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        },
+        "abilities" : [ {
+          "type" : "ProjectPushDown",
+          "projectedFields" : [ [ 0 ], [ 1 ], [ 3 ], [ 4 ], [ 5 ] ],
+          "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), 
`city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT 
NULL"
+        }, {
+          "type" : "ReadingMetadata",
+          "metadataKeys" : [ ],
+          "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), 
`city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT 
NULL"
+        } ]
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), 
`city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT 
NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), 
`state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, 
total, id, name, city, state, zipcode])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, city, state, 
zipcode])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/savepoint/_metadata
new file mode 100644
index 00000000000..6cda43d12c3
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/plan/lookup-join-retry-hint.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/plan/lookup-join-retry-hint.json
new file mode 100644
index 00000000000..6463fbf848e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/plan/lookup-join-retry-hint.json
@@ -0,0 +1,252 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 29,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`orders_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "customer_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "proc_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` 
DOUBLE> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, orders_t, project=[order_id, customer_id, total], 
metadata=[]]], fields=[order_id, customer_id, total])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 30,
+    "type" : "stream-exec-lookup-join_1",
+    "joinType" : "INNER",
+    "joinCondition" : null,
+    "temporalTable" : {
+      "lookupTableSource" : {
+        "table" : {
+          "identifier" : "`default_catalog`.`default_database`.`customers_t`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "id",
+                "dataType" : "INT NOT NULL"
+              }, {
+                "name" : "name",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "age",
+                "dataType" : "INT"
+              }, {
+                "name" : "city",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "state",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "zipcode",
+                "dataType" : "INT"
+              } ],
+              "watermarkSpecs" : [ ],
+              "primaryKey" : {
+                "name" : "PK_id",
+                "type" : "PRIMARY_KEY",
+                "columns" : [ "id" ]
+              }
+            },
+            "partitionKeys" : [ ]
+          }
+        }
+      },
+      "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` 
INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> 
NOT NULL"
+    },
+    "lookupKeys" : {
+      "0" : {
+        "type" : "FieldRef",
+        "index" : 1
+      }
+    },
+    "projectionOnTemporalTable" : null,
+    "filterOnTemporalTable" : null,
+    "lookupKeyContainsPrimaryKey" : true,
+    "retryOptions" : {
+      "retry-predicate" : "lookup_miss",
+      "retry-strategy" : "FIXED_DELAY",
+      "fixed-delay" : 10000,
+      "max-attempts" : 3
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, 
`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` 
VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>",
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.customers_t], 
joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, 
total, id, name, age, city, state, zipcode], retry=[lookup_miss, FIXED_DELAY, 
10000ms, 3])"
+  }, {
+    "id" : 31,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 7,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 8,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Calc(select=[order_id, total, id, name, age, city, state, 
zipcode])"
+  }, {
+    "id" : 32,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "order_id",
+              "dataType" : "INT"
+            }, {
+              "name" : "total",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "age",
+              "dataType" : "INT"
+            }, {
+              "name" : "city",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "state",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "zipcode",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, 
`name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` 
VARCHAR(2147483647), `zipcode` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[order_id, total, id, name, age, city, state, zipcode])"
+  } ],
+  "edges" : [ {
+    "source" : 29,
+    "target" : 30,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 30,
+    "target" : 31,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 31,
+    "target" : 32,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/savepoint/_metadata
new file mode 100644
index 00000000000..1d7e42191d8
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/savepoint/_metadata
 differ

Reply via email to