This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 01cdc703ee6 [FLINK-24239] Event time temporal join should support 
values from array, map, row, etc. as join key (#24253)
01cdc703ee6 is described below

commit 01cdc703ee6fa56bdfdf799d016c0e882e9e5d99
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Feb 8 15:26:16 2024 +0100

    [FLINK-24239] Event time temporal join should support values from array, 
map, row, etc. as join key (#24253)
---
 ...gicalCorrelateToJoinFromTemporalTableRule.scala | 151 +++---
 .../nodes/exec/stream/TemporalJoinRestoreTest.java |   2 +
 .../exec/stream/TemporalJoinTestPrograms.java      |  82 +++
 .../temporal-join-table-join-key-from-map.json     | 569 +++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 14977 bytes
 .../plan/temporal-join-table-join-nested-key.json  | 600 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 14973 bytes
 7 files changed, 1336 insertions(+), 68 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 217b9597561..25f1d29d8ea 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -26,11 +26,12 @@ import 
org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, Table
 import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
 import org.apache.flink.table.sources.LookupableTableSource
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, 
RelOptUtil}
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.hep.{HepPlanner, HepRelVertex}
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.core.{CorrelationId, TableScan}
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rex._
 
@@ -141,6 +142,30 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
       }
     case _ => false
   }
+
+  protected def decorrelate(
+      rexNode: RexNode,
+      leftRowType: RelDataType,
+      correlationId: CorrelationId): RexNode = {
+    rexNode.accept(new RexShuttle() {
+      // change correlate variable expression to normal RexInputRef (which is 
from left side)
+      override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+        fieldAccess.getReferenceExpr match {
+          case corVar: RexCorrelVariable =>
+            require(correlationId.equals(corVar.id))
+            val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+            RexInputRef.of(index, leftRowType)
+          case _ => super.visitFieldAccess(fieldAccess)
+        }
+      }
+
+      // update the field index from right side
+      override def visitInputRef(inputRef: RexInputRef): RexNode = {
+        val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+        new RexInputRef(rightIndex, inputRef.getType)
+      }
+    })
+  }
 }
 
 /**
@@ -161,24 +186,7 @@ abstract class 
LogicalCorrelateToJoinFromLookupTemporalTableRule(
     validateSnapshotInCorrelate(snapshot, correlate)
 
     val leftRowType = leftInput.getRowType
-    val joinCondition = filterCondition.accept(new RexShuttle() {
-      // change correlate variable expression to normal RexInputRef (which is 
from left side)
-      override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-        fieldAccess.getReferenceExpr match {
-          case corVar: RexCorrelVariable =>
-            require(correlate.getCorrelationId.equals(corVar.id))
-            val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
-            RexInputRef.of(index, leftRowType)
-          case _ => super.visitFieldAccess(fieldAccess)
-        }
-      }
-
-      // update the field index from right side
-      override def visitInputRef(inputRef: RexInputRef): RexNode = {
-        val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
-        new RexInputRef(rightIndex, inputRef.getType)
-      }
-    })
+    val joinCondition = decorrelate(filterCondition, leftRowType, 
correlate.getCorrelationId)
 
     val builder = call.builder()
     builder.push(leftInput)
@@ -198,8 +206,8 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 
   protected def extractRightEventTimeInputRef(
       leftInput: RelNode,
-      snapshot: LogicalSnapshot): Option[RexNode] = {
-    val rightFields = snapshot.getRowType.getFieldList.asScala
+      rightInput: RelNode): Option[RexNode] = {
+    val rightFields = rightInput.getRowType.getFieldList.asScala
     val timeAttributeFields = rightFields.filter(
       f =>
         f.getType.isInstanceOf[TimeIndicatorRelDataType] &&
@@ -209,7 +217,7 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
       val timeColIndex = leftInput.getRowType.getFieldCount +
         rightFields.indexOf(timeAttributeFields.get(0))
       val timeColDataType = timeAttributeFields.get(0).getType
-      val rexBuilder = snapshot.getCluster.getRexBuilder
+      val rexBuilder = rightInput.getCluster.getRexBuilder
       Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
     } else {
       None
@@ -237,57 +245,32 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
     val snapshot = getLogicalSnapshot(call)
 
     val leftRowType = leftInput.getRowType
-    val joinCondition = filterCondition.accept(new RexShuttle() {
-      // change correlate variable expression to normal RexInputRef (which is 
from left side)
-      override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-        fieldAccess.getReferenceExpr match {
-          case corVar: RexCorrelVariable =>
-            require(correlate.getCorrelationId.equals(corVar.id))
-            val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
-            RexInputRef.of(index, leftRowType)
-          case _ => super.visitFieldAccess(fieldAccess)
-        }
-      }
-
-      // update the field index from right side
-      override def visitInputRef(inputRef: RexInputRef): RexNode = {
-        val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
-        new RexInputRef(rightIndex, inputRef.getType)
-      }
-    })
+    val joinCondition = decorrelate(filterCondition, leftRowType, 
correlate.getCorrelationId)
 
     validateSnapshotInCorrelate(snapshot, correlate)
 
     val rexBuilder = correlate.getCluster.getRexBuilder
-    val (leftJoinKey, rightJoinKey) = {
-      val relBuilder = call.builder()
-      relBuilder.push(leftInput)
-      relBuilder.push(snapshot)
-      val rewriteJoin = relBuilder.join(correlate.getJoinType, 
joinCondition).build()
-      val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
-      val leftJoinKey = joinInfo.leftKeys.map(i => 
rexBuilder.makeInputRef(leftInput, i))
-      val leftFieldCnt = leftInput.getRowType.getFieldCount
-      val rightJoinKey = joinInfo.rightKeys.map(
-        i => {
-          val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
-          rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
-        })
-      if (leftJoinKey.length == 0 || rightJoinKey.length == 0) {
-        throw new ValidationException(
-          "Currently the join key in Temporal Table Join " +
-            "can not be empty.")
-      }
-      (leftJoinKey, rightJoinKey)
+    val relBuilder = call.builder()
+    relBuilder.push(leftInput)
+    relBuilder.push(snapshot)
+    val nonPushedJoin =
+      relBuilder.join(correlate.getJoinType, 
joinCondition).build().asInstanceOf[LogicalJoin]
+    val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, 
relBuilder)
+    val actualJoin = rewriteJoin match {
+      case _: LogicalJoin => rewriteJoin.asInstanceOf[LogicalJoin]
+      case _ => rewriteJoin.getInput(0).asInstanceOf[LogicalJoin]
     }
 
-    val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot)
+    val (leftJoinKey, rightJoinKey) = extractJoinKeys(actualJoin)
+
+    val snapshotTimeInputRef = extractSnapshotTimeInputRef(actualJoin.getLeft, 
snapshot)
       .getOrElse(
         throw new ValidationException(
           "Temporal Table Join requires time " +
             "attribute in the left table, but no time attribute found."))
 
     val temporalCondition = if (isRowTimeTemporalTableJoin(snapshot)) {
-      val rightTimeInputRef = extractRightEventTimeInputRef(leftInput, 
snapshot)
+      val rightTimeInputRef = 
extractRightEventTimeInputRef(actualJoin.getLeft, actualJoin.getRight)
       if (rightTimeInputRef.isEmpty || 
!isRowtimeIndicatorType(rightTimeInputRef.get.getType)) {
         throw new ValidationException(
           "Event-Time Temporal Table Join requires both" +
@@ -323,15 +306,47 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
     }
 
     val builder = call.builder()
-    val condition = builder.and(joinCondition, temporalCondition)
-
-    builder.push(leftInput)
-    builder.push(snapshot)
-    builder.join(correlate.getJoinType, condition)
-    val temporalJoin = builder.build()
+    val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+    val joinWithTemporalCondition = actualJoin.copy(
+      actualJoin.getTraitSet,
+      condition,
+      actualJoin.getLeft,
+      actualJoin.getRight,
+      actualJoin.getJoinType,
+      actualJoin.isSemiJoinDone)
+
+    val temporalJoin = if (actualJoin != rewriteJoin) {
+      rewriteJoin.replaceInput(0, joinWithTemporalCondition)
+      rewriteJoin
+    } else {
+      joinWithTemporalCondition
+    }
     call.transformTo(temporalJoin)
   }
 
+  private def extractJoinKeys(actualJoin: LogicalJoin): (Seq[RexNode], 
Seq[RexNode]) = {
+
+    val joinInfo = actualJoin.analyzeCondition()
+    val leftInput = actualJoin.getInput(0)
+    val rightInput = actualJoin.getInput(1)
+    val rexBuilder = actualJoin.getCluster.getRexBuilder
+
+    val leftJoinKey = joinInfo.leftKeys.map(i => 
rexBuilder.makeInputRef(leftInput, i))
+    val leftFieldCnt = leftInput.getRowType.getFieldCount
+    val rightJoinKey = joinInfo.rightKeys.map(
+      i => {
+        val rightKeyType = rightInput.getRowType.getFieldList.get(i).getType
+        rexBuilder.makeInputRef(rightKeyType, leftFieldCnt + i)
+      })
+    if (leftJoinKey.isEmpty || rightJoinKey.isEmpty) {
+      throw new ValidationException(
+        "Currently the join key in Temporal Table Join " +
+          "can not be empty.")
+    }
+    (leftJoinKey, rightJoinKey)
+  }
+
   private def isRowTimeTemporalTableJoin(snapshot: LogicalSnapshot): Boolean =
     snapshot.getPeriod.getType match {
       case t: TimeIndicatorRelDataType if t.isEventTime => true
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
index 44f48fc24c3..a8584ac951f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
@@ -35,6 +35,8 @@ public class TemporalJoinRestoreTest extends RestoreTestBase {
     public List<TableTestProgram> programs() {
         return Arrays.asList(
                 TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN,
+                TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY,
+                TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP,
                 TemporalJoinTestPrograms.TEMPORAL_JOIN_TEMPORAL_FUNCTION);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
index 883d628fbd9..ed83169c74a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.flink.table.api.Expressions.$;
 
 /** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalJoin}. */
@@ -45,6 +48,49 @@ public class TemporalJoinTestPrograms {
                             Row.of(1L, "USD", "2020-10-10 00:00:58"))
                     .build();
 
+    static final SourceTestStep ORDERS_WITH_NESTED_ID =
+            SourceTestStep.newBuilder("OrdersNestedId")
+                    .addSchema(
+                            "amount bigint",
+                            "nested_row ROW<currency STRING>",
+                            "nested_map MAP<STRING NOT NULL, STRING>",
+                            "order_time STRING",
+                            "rowtime as TO_TIMESTAMP(order_time) ",
+                            "WATERMARK FOR rowtime AS rowtime")
+                    .producedBeforeRestore(
+                            Row.of(
+                                    2L,
+                                    Row.of("Euro"),
+                                    mapOf("currency", "Euro"),
+                                    "2020-10-10 00:00:42"),
+                            Row.of(
+                                    1L,
+                                    Row.of("usd"),
+                                    mapOf("currency", "USD"),
+                                    "2020-10-10 00:00:43"),
+                            Row.of(
+                                    50L,
+                                    Row.of("Yen"),
+                                    mapOf("currency", "Yen"),
+                                    "2020-10-10 00:00:44"),
+                            Row.of(
+                                    3L,
+                                    Row.of("Euro"),
+                                    mapOf("currency", "Euro"),
+                                    "2020-10-10 00:00:45"))
+                    .producedAfterRestore(
+                            Row.of(
+                                    1L,
+                                    Row.of("Euro"),
+                                    mapOf("currency", "Euro"),
+                                    "2020-10-10 00:00:58"),
+                            Row.of(
+                                    1L,
+                                    Row.of("usd"),
+                                    mapOf("currency", "USD"),
+                                    "2020-10-10 00:00:58"))
+                    .build();
+
     static final SourceTestStep RATES =
             SourceTestStep.newBuilder("RatesHistory")
                     .addSchema(
@@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms {
                                     + "ON o.currency = r.currency ")
                     .build();
 
+    static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY =
+            TableTestProgram.of(
+                            "temporal-join-table-join-nested-key",
+                            "validates temporal join with a table when the 
join keys comes from a nested row")
+                    .setupTableSource(ORDERS_WITH_NESTED_ID)
+                    .setupTableSource(RATES)
+                    .setupTableSink(AMOUNTS)
+                    .runSql(
+                            "INSERT INTO MySink "
+                                    + "SELECT amount * r.rate "
+                                    + "FROM OrdersNestedId AS o "
+                                    + "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
+                                    + "ON (case when o.nested_row.currency = 
'usd' then upper(o.nested_row.currency) ELSE o.nested_row.currency END) = 
r.currency ")
+                    .build();
+
+    static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP =
+            TableTestProgram.of(
+                            "temporal-join-table-join-key-from-map",
+                            "validates temporal join with a table when the 
join key comes from a map value")
+                    .setupTableSource(ORDERS_WITH_NESTED_ID)
+                    .setupTableSource(RATES)
+                    .setupTableSink(AMOUNTS)
+                    .runSql(
+                            "INSERT INTO MySink "
+                                    + "SELECT amount * r.rate "
+                                    + "FROM OrdersNestedId AS o "
+                                    + "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
+                                    + "ON o.nested_map['currency'] = 
r.currency ")
+                    .build();
+
     static final TableTestProgram TEMPORAL_JOIN_TEMPORAL_FUNCTION =
             TableTestProgram.of(
                             "temporal-join-temporal-function",
@@ -100,4 +176,10 @@ public class TemporalJoinTestPrograms {
                                     + "LATERAL TABLE (Rates(o.rowtime)) AS r "
                                     + "WHERE o.currency = r.currency ")
                     .build();
+
+    private static Map<String, String> mapOf(String key, String value) {
+        final HashMap<String, String> map = new HashMap<>();
+        map.put(key, value);
+        return map;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json
new file mode 100644
index 00000000000..130b8af8d4c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json
@@ -0,0 +1,569 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 24,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`OrdersNestedId`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "nested_row",
+              "dataType" : "ROW<`currency` VARCHAR(2147483647)>"
+            }, {
+              "name" : "nested_map",
+              "dataType" : "MAP<VARCHAR(2147483647) NOT NULL, 
VARCHAR(2147483647)>"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`order_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 4,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 3 ], [ 2 ] ],
+        "producedType" : "ROW<`amount` BIGINT, `order_time` 
VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL, 
VARCHAR(2147483647)>> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`amount` BIGINT, `order_time` 
VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL, 
VARCHAR(2147483647)>> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), 
`nested_map` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, OrdersNestedId, project=[amount, order_time, nested_map], 
metadata=[]]], fields=[amount, order_time, nested_map])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 25,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`amount` BIGINT, `rowtime` TIMESTAMP(3), `nested_map` 
MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>>",
+    "description" : "Calc(select=[amount, TO_TIMESTAMP(order_time) AS rowtime, 
nested_map])"
+  }, {
+    "id" : 26,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "nested_map",
+        "fieldType" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 27,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$ITEM$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "currency",
+        "type" : "CHAR(8) NOT NULL"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$f5",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Calc(select=[amount, rowtime, ITEM(nested_map, 
'currency') AS $f5])"
+  }, {
+    "id" : 28,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 2 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$f5",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[$f5]])"
+  }, {
+    "id" : 29,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "rate",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "rate_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`rate_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ],
+            "primaryKey" : {
+              "name" : "PK_currency",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "currency" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rate_time` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rate_time])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 30,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS 
rowtime])"
+  }, {
+    "id" : 31,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 32,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 33,
+    "type" : "stream-exec-temporal-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 2 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "isTemporalFunctionJoin" : false,
+    "leftTimeAttributeIndex" : 1,
+    "rightTimeAttributeIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$f5",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime0",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TemporalJoin(joinType=[InnerJoin], where=[(($f5 = 
currency) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency), __TEMPORAL_JOIN_LEFT_KEY($f5), 
__TEMPORAL_JOIN_RIGHT_KEY(currency)))], select=[amount, rowtime, $f5, currency, 
rate, rowtime0])"
+  }, {
+    "id" : 34,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+  }, {
+    "id" : 35,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 24,
+    "target" : 25,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 25,
+    "target" : 26,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 26,
+    "target" : 27,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 27,
+    "target" : 28,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 29,
+    "target" : 30,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 30,
+    "target" : 31,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 31,
+    "target" : 32,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 28,
+    "target" : 33,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 32,
+    "target" : 33,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 33,
+    "target" : 34,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 34,
+    "target" : 35,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata
new file mode 100644
index 00000000000..ac6d04137e8
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json
new file mode 100644
index 00000000000..679b3835273
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json
@@ -0,0 +1,600 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 12,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`OrdersNestedId`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "nested_row",
+              "dataType" : "ROW<`currency` VARCHAR(2147483647)>"
+            }, {
+              "name" : "nested_map",
+              "dataType" : "MAP<VARCHAR(2147483647) NOT NULL, 
VARCHAR(2147483647)>"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`order_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 4,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 3 ], [ 1 ] ],
+        "producedType" : "ROW<`amount` BIGINT, `order_time` 
VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`amount` BIGINT, `order_time` 
VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), 
`nested_row` ROW<`currency` VARCHAR(2147483647)>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, OrdersNestedId, project=[amount, order_time, nested_row], 
metadata=[]]], fields=[amount, order_time, nested_row])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "ROW<`currency` VARCHAR(2147483647)>"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`amount` BIGINT, `rowtime` TIMESTAMP(3), `nested_row` 
ROW<`currency` VARCHAR(2147483647)>>",
+    "description" : "Calc(select=[amount, TO_TIMESTAMP(order_time) AS rowtime, 
nested_row])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "nested_row",
+        "fieldType" : "ROW<`currency` VARCHAR(2147483647)>"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CASE$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "FIELD_ACCESS",
+          "name" : "currency",
+          "expr" : {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "ROW<`currency` VARCHAR(2147483647)>"
+          }
+        }, {
+          "kind" : "LITERAL",
+          "value" : "usd",
+          "type" : "VARCHAR(2147483647) NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "internalName" : "$UPPER$1",
+        "operands" : [ {
+          "kind" : "FIELD_ACCESS",
+          "name" : "currency",
+          "expr" : {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "ROW<`currency` VARCHAR(2147483647)>"
+          }
+        } ],
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "FIELD_ACCESS",
+        "name" : "currency",
+        "expr" : {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "ROW<`currency` VARCHAR(2147483647)>"
+        }
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$f5",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Calc(select=[amount, rowtime, CASE((nested_row.currency = 
'usd'), UPPER(nested_row.currency), nested_row.currency) AS $f5])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 2 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$f5",
+        "fieldType" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[$f5]])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "rate",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "rate_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`rate_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ],
+            "primaryKey" : {
+              "name" : "PK_currency",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "currency" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rate_time` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rate_time])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS 
rowtime])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-temporal-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 2 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "isTemporalFunctionJoin" : false,
+    "leftTimeAttributeIndex" : 1,
+    "rightTimeAttributeIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$f5",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime0",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TemporalJoin(joinType=[InnerJoin], where=[(($f5 = 
currency) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency), __TEMPORAL_JOIN_LEFT_KEY($f5), 
__TEMPORAL_JOIN_RIGHT_KEY(currency)))], select=[amount, rowtime, $f5, currency, 
rate, rowtime0])"
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata
new file mode 100644
index 00000000000..c70770ac5db
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata
 differ

Reply via email to