This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b142860352721fe65b8fe7e4106cbcd2059714e5
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Aug 10 15:30:50 2022 +0200

    [FLINK-28861][table] Make UID generation behavior configurable and 
plan-only by default
    
    Before this commit, due to changes for FLIP-190, every operator generated 
by the planner
    got a UID assigned. However, the UID is based on a static counter that 
might return different
    results depending on the environment. Thus, UIDs are not deterministic and 
make stateful
    restores impossible e.g. when going from 1.15.0 -> 1.15.1. This PR restores 
the old pre-1.15
    behavior for regular Table API. It only adds UIDs if the operator has been 
created from a
    compiled plan. A compiled plan makes the UIDs static and thus deterministic.
    
    table.exec.uid.generation=ALWAYS exists for backwards compatibility and 
could make stateful
    upgrades possible even with invalid UIDs on best effort basis.
---
 .../generated/execution_config_configuration.html  |  6 ++
 .../connector/file/table/stream/StreamingSink.java | 59 +++++++------
 .../connectors/hive/HiveTableSourceITCase.java     |  6 +-
 .../flink/table/connector/ProviderContext.java     |  4 +-
 .../table/api/config/ExecutionConfigOptions.java   | 73 ++++++++++++++--
 .../table/planner/plan/nodes/exec/ExecNode.java    |  6 ++
 .../planner/plan/nodes/exec/ExecNodeBase.java      | 43 ++++++----
 .../planner/plan/nodes/exec/ExecNodeConfig.java    | 45 +++++++++-
 .../plan/nodes/exec/ExecNodeGraphGenerator.java    | 10 +--
 .../plan/nodes/exec/ExecNodeTranslator.java        |  2 +-
 .../plan/nodes/exec/common/CommonExecMatch.java    |  2 +-
 .../exec/common/CommonExecPythonCorrelate.java     | 12 +--
 .../plan/nodes/exec/common/CommonExecSink.java     | 18 ++--
 .../exec/common/CommonExecTableSourceScan.java     |  6 +-
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  8 +-
 .../nodes/exec/stream/StreamExecIntervalJoin.java  |  4 +-
 .../plan/nodes/exec/stream/StreamExecMatch.java    |  3 +-
 .../StreamExecPythonGroupWindowAggregate.java      | 15 ++--
 .../table/planner/delegation/PlannerBase.scala     | 12 +--
 .../table/planner/delegation/StreamPlanner.scala   |  2 +-
 .../plan/nodes/physical/FlinkPhysicalRel.scala     | 16 ++++
 .../table/api/internal/CompiledPlanUtils.java      |  4 +-
 .../codegen/LongAdaptiveHashJoinGeneratorTest.java |  5 +-
 .../plan/nodes/exec/TestingBatchExecNode.java      |  5 ++
 .../plan/nodes/exec/TransformationsTest.java       | 98 ++++++++++++++++------
 .../MultipleInputNodeCreationProcessorTest.java    |  3 +-
 .../planner/utils/InternalConfigOptionsTest.java   |  3 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 28 files changed, 347 insertions(+), 127 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index d2974df5ac6..d6292edc1b4 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -160,6 +160,12 @@ By default no operator is disabled.</td>
             <td>Duration</td>
             <td>Specifies a minimum time interval for how long idle state 
(i.e. state which was not updated), will be retained. State will never be 
cleared until it was idle for less than the minimum time, and will be cleared 
at some time after it was idle. Default is never clean-up the state. NOTE: 
Cleaning up state requires additional overhead for bookkeeping. Default value 
is 0, which means that it will never clean up state.</td>
         </tr>
+        <tr>
+            <td><h5>table.exec.uid.generation</h5><br> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">PLAN_ONLY</td>
+            <td><p>Enum</p></td>
+            <td>In order to remap state to operators during a restore, it is 
required that the pipeline's streaming transformations get a UID assigned.<br 
/>The planner can generate and assign explicit UIDs. If no UIDs have been set 
by the planner, the UIDs will be auto-generated by lower layers that can take 
the complete topology into account for uniqueness of the IDs. See the 
DataStream API for more information.<br />This configuration option is for 
experts only and the default should  [...]
+        </tr>
         <tr>
             <td><h5>table.exec.window-agg.buffer-size-limit</h5><br> <span 
class="label label-primary">Batch</span></td>
             <td style="word-wrap: break-word;">100000</td>
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
index 140f1596c8f..ee86f08654d 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
@@ -71,13 +71,15 @@ public class StreamingSink {
             Configuration conf) {
         StreamingFileWriter<T> fileWriter =
                 new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, 
partitionKeys, conf);
-        return inputStream
-                .transform(
-                        StreamingFileWriter.class.getSimpleName(),
-                        TypeInformation.of(PartitionCommitInfo.class),
-                        fileWriter)
-                .uid(providerContext.generateUid("streaming-writer").get())
-                .setParallelism(parallelism);
+        SingleOutputStreamOperator<PartitionCommitInfo> writerStream =
+                inputStream
+                        .transform(
+                                StreamingFileWriter.class.getSimpleName(),
+                                TypeInformation.of(PartitionCommitInfo.class),
+                                fileWriter)
+                        .setParallelism(parallelism);
+        
providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
+        return writerStream;
     }
 
     /**
@@ -104,21 +106,24 @@ public class StreamingSink {
 
         CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, 
targetFileSize);
 
-        SingleOutputStreamOperator<CoordinatorOutput> coordinatorOp =
+        SingleOutputStreamOperator<CoordinatorInput> writerStream =
                 inputStream
                         .transform(
                                 "streaming-writer",
                                 TypeInformation.of(CoordinatorInput.class),
                                 writer)
-                        
.uid(providerContext.generateUid("streaming-writer").get())
-                        .setParallelism(parallelism)
+                        .setParallelism(parallelism);
+        
providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
+
+        SingleOutputStreamOperator<CoordinatorOutput> coordinatorStream =
+                writerStream
                         .transform(
                                 "compact-coordinator",
                                 TypeInformation.of(CoordinatorOutput.class),
                                 coordinator)
-                        
.uid(providerContext.generateUid("compact-coordinator").get())
                         .setParallelism(1)
                         .setMaxParallelism(1);
+        
providerContext.generateUid("compact-coordinator").ifPresent(coordinatorStream::uid);
 
         CompactWriter.Factory<T> writerFactory =
                 CompactBucketWriter.factory(
@@ -128,14 +133,17 @@ public class StreamingSink {
         CompactOperator<T> compacter =
                 new CompactOperator<>(fsSupplier, readFactory, writerFactory);
 
-        return coordinatorOp
-                .broadcast()
-                .transform(
-                        "compact-operator",
-                        TypeInformation.of(PartitionCommitInfo.class),
-                        compacter)
-                .uid(providerContext.generateUid("compact-operator").get())
-                .setParallelism(parallelism);
+        SingleOutputStreamOperator<PartitionCommitInfo> operatorStream =
+                coordinatorStream
+                        .broadcast()
+                        .transform(
+                                "compact-operator",
+                                TypeInformation.of(PartitionCommitInfo.class),
+                                compacter)
+                        .setParallelism(parallelism);
+        
providerContext.generateUid("compact-operator").ifPresent(operatorStream::uid);
+
+        return operatorStream;
     }
 
     /**
@@ -156,17 +164,18 @@ public class StreamingSink {
             PartitionCommitter committer =
                     new PartitionCommitter(
                             locationPath, identifier, partitionKeys, 
msFactory, fsFactory, options);
-            stream =
+            SingleOutputStreamOperator<Void> committerStream =
                     writer.transform(
                                     PartitionCommitter.class.getSimpleName(), 
Types.VOID, committer)
-                            
.uid(providerContext.generateUid("partition-committer").get())
                             .setParallelism(1)
                             .setMaxParallelism(1);
+            
providerContext.generateUid("partition-committer").ifPresent(committerStream::uid);
+            stream = committerStream;
         }
 
-        return stream.addSink(new DiscardingSink<>())
-                .uid(providerContext.generateUid("discarding-sink").get())
-                .name("end")
-                .setParallelism(1);
+        DataStreamSink<?> discardingSink =
+                stream.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
+        
providerContext.generateUid("discarding-sink").ifPresent(discardingSink::uid);
+        return discardingSink;
     }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index d32e6694169..99540c947c6 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -519,7 +519,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
         PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
tEnv).getPlanner();
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNode<?> execNode =
-                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
+                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), 
false)
                         .getRootNodes()
                         .get(0);
         Transformation<?> transformation = execNode.translateToPlan(planner);
@@ -550,7 +550,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
         PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
tEnv).getPlanner();
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNode<?> execNode =
-                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
+                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), 
false)
                         .getRootNodes()
                         .get(0);
         Transformation<?> transformation =
@@ -584,7 +584,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
         PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
tEnv).getPlanner();
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNode<?> execNode =
-                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
+                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), 
false)
                         .getRootNodes()
                         .get(0);
         Transformation<?> transformation =
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
index 46c9cfa206c..c36aa683aad 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
@@ -38,8 +38,8 @@ public interface ProviderContext {
      * The {@code name} must be unique within the provider implementation. The 
framework will make
      * sure that the name is unique for the entire topology.
      *
-     * <p>This method returns empty if an identifier cannot be generated, i.e. 
because the job is in
-     * batch mode.
+     * <p>This method returns empty if an identifier cannot be generated, 
i.e., because the job is
+     * in batch mode, or UIDs cannot be guaranteed to be unique.
      */
     Optional<String> generateUid(String name);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 474fc4f9c24..2f449dfcf6b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -488,6 +488,7 @@ public class ExecutionConfigOptions {
                                             + "all changes to downstream just 
like when the mini-batch is "
                                             + "not enabled.");
 
+    /** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     @Deprecated
     public static final ConfigOption<Boolean> 
TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
@@ -495,11 +496,38 @@ public class ExecutionConfigOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "In Flink 1.15 Transformation UIDs are generated 
deterministically starting from the metadata available after the planning 
phase. "
-                                    + "This new behaviour allows a safe 
restore of persisted plan, remapping the plan execution graph to the correct 
operators state. "
-                                    + "Setting this flag to true enables the 
previous \"legacy\" behavior, which is generating uids from the Transformation 
graph topology. "
-                                    + "We strongly suggest to keep this flag 
disabled, as this flag is going to be removed in the next releases. "
-                                    + "If you have a pipeline relying on the 
old behavior, please create a new pipeline and regenerate the operators 
state.");
+                            "This flag has been replaced by 
table.exec.uid.generation. Use the enum "
+                                    + "value DISABLED to restore legacy 
behavior. However, the new "
+                                    + "default value should be sufficient for 
most use cases as "
+                                    + "only pipelines from compiled plans get 
UIDs assigned.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
+            key("table.exec.uid.generation")
+                    .enumType(UidGeneration.class)
+                    .defaultValue(UidGeneration.PLAN_ONLY)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "In order to remap state to 
operators during a restore, "
+                                                    + "it is required that the 
pipeline's streaming "
+                                                    + "transformations get a 
UID assigned.")
+                                    .linebreak()
+                                    .text(
+                                            "The planner can generate and 
assign explicit UIDs. If no "
+                                                    + "UIDs have been set by 
the planner, the UIDs will "
+                                                    + "be auto-generated by 
lower layers that can take "
+                                                    + "the complete topology 
into account for uniqueness "
+                                                    + "of the IDs. See the 
DataStream API for more information.")
+                                    .linebreak()
+                                    .text(
+                                            "This configuration option is for 
experts only and the default "
+                                                    + "should be sufficient 
for most use cases. By default, "
+                                                    + "only pipelines created 
from a persisted compiled plan will "
+                                                    + "get UIDs assigned 
explicitly. Thus, these pipelines can "
+                                                    + "be arbitrarily moved 
around within the same topology without "
+                                                    + "affecting the stable 
UIDs.")
+                                    .build());
 
     // 
------------------------------------------------------------------------------------------
     // Enum option types
@@ -622,4 +650,39 @@ public class ExecutionConfigOptions {
             return enabled;
         }
     }
+
+    /**
+     * Strategy for generating transformation UIDs for remapping state to 
operators during restore.
+     */
+    @PublicEvolving
+    public enum UidGeneration implements DescribedEnum {
+        PLAN_ONLY(
+                text(
+                        "Sets UIDs on streaming transformations if and only if 
the pipeline definition "
+                                + "comes from a compiled plan. Pipelines that 
have been constructed in "
+                                + "the API without a compilation step will not 
set an explicit UID as "
+                                + "it might not be stable across multiple 
translations.")),
+        ALWAYS(
+                text(
+                        "Always sets UIDs on streaming transformations. This 
strategy is for experts only! "
+                                + "Pipelines that have been constructed in the 
API without a compilation "
+                                + "step might not be able to be restored 
properly. The UID generation "
+                                + "depends on previously declared pipelines 
(potentially across jobs "
+                                + "if the same JVM is used). Thus, a stable 
environment must be ensured. "
+                                + "Pipeline definitions that come from a 
compiled plan are safe to use.")),
+
+        DISABLED(text("No explicit UIDs will be set."));
+
+        private final InlineElement description;
+
+        UidGeneration(InlineElement description) {
+            this.description = description;
+        }
+
+        @Internal
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index d2b06fd5342..dc18eb58a5e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -116,4 +116,10 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> 
{
      * @param visitor ExecNodeVisitor.
      */
     void accept(ExecNodeVisitor visitor);
+
+    /**
+     * Declares whether the node has been created as part of a plan 
compilation. Some translation
+     * properties might be impacted by this (e.g. UID generation for 
transformations).
+     */
+    void setCompiled(boolean isCompiled);
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 843abc2d992..0e949aa666b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JacksonInject;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
@@ -49,6 +50,15 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
+    /**
+     * The default value of this flag is false. Other cases must set this flag 
accordingly via
+     * {@link #setCompiled(boolean)}. It is not exposed via a constructor arg 
to avoid complex
+     * constructor overloading for all {@link ExecNode}s. However, during 
deserialization this flag
+     * will always be set to true.
+     */
+    @JacksonInject("isDeserialize")
+    private boolean isCompiled;
+
     private final String description;
 
     private final LogicalType outputType;
@@ -147,8 +157,10 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
             transformation =
                     translateToPlanInternal(
                             (PlannerBase) planner,
-                            new ExecNodeConfig(
-                                    ((PlannerBase) planner).getTableConfig(), 
persistedConfig));
+                            ExecNodeConfig.of(
+                                    ((PlannerBase) planner).getTableConfig(),
+                                    persistedConfig,
+                                    isCompiled));
             if (this instanceof SingleTransformationTranslator) {
                 if (inputsContainSingleton()) {
                     transformation.setParallelism(1);
@@ -159,6 +171,16 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
         return transformation;
     }
 
+    @Override
+    public void accept(ExecNodeVisitor visitor) {
+        visitor.visit(this);
+    }
+
+    @Override
+    public void setCompiled(boolean compiled) {
+        isCompiled = compiled;
+    }
+
     /**
      * Internal method, translates this node into a Flink operator.
      *
@@ -171,11 +193,6 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
     protected abstract Transformation<T> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config);
 
-    @Override
-    public void accept(ExecNodeVisitor visitor) {
-        visitor.visit(this);
-    }
-
     /** Whether singleton distribution is required. */
     protected boolean inputsContainSingleton() {
         return getInputProperties().stream()
@@ -203,13 +220,11 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
     }
 
     protected TransformationMetadata createTransformationMeta(
-            String operatorName, ReadableConfig config) {
-        if (ExecNodeMetadataUtil.isUnsupported(this.getClass())
-                || 
config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+            String operatorName, ExecNodeConfig config) {
+        if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || 
!config.shouldSetUid()) {
             return new TransformationMetadata(
                     createTransformationName(config), 
createTransformationDescription(config));
         } else {
-            // Only classes supporting metadata util need to set the uid
             return new TransformationMetadata(
                     createTransformationUid(operatorName),
                     createTransformationName(config),
@@ -218,14 +233,12 @@ public abstract class ExecNodeBase<T> implements 
ExecNode<T> {
     }
 
     protected TransformationMetadata createTransformationMeta(
-            String operatorName, String detailName, String simplifiedName, 
ReadableConfig config) {
+            String operatorName, String detailName, String simplifiedName, 
ExecNodeConfig config) {
         final String name = createFormattedTransformationName(detailName, 
simplifiedName, config);
         final String desc = 
createFormattedTransformationDescription(detailName, config);
-        if (ExecNodeMetadataUtil.isUnsupported(this.getClass())
-                || 
config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+        if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || 
!config.shouldSetUid()) {
             return new TransformationMetadata(name, desc);
         } else {
-            // Only classes supporting metadata util need to set the uid
             return new 
TransformationMetadata(createTransformationUid(operatorName), name, desc);
         }
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
index f56165f66c0..34f7efd0e50 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
@@ -19,11 +19,12 @@
 package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 
 import java.util.Optional;
@@ -33,6 +34,10 @@ import java.util.Optional;
  * ExecNodeBase#getPersistedConfig()} configuration. The persisted 
configuration of the {@link
  * ExecNode} which is deserialized from the JSON plan has precedence over the 
{@link
  * PlannerBase#getTableConfig()}.
+ *
+ * <p>This class is intended to contain additional context information for 
{@link ExecNode}
+ * translation such as {@link #shouldSetUid()} or helper methods for accessing 
configuration such as
+ * {@link #getStateRetentionTime()}.
  */
 @Internal
 public final class ExecNodeConfig implements ReadableConfig {
@@ -41,10 +46,22 @@ public final class ExecNodeConfig implements ReadableConfig 
{
 
     private final ReadableConfig nodeConfig;
 
-    @VisibleForTesting
-    public ExecNodeConfig(TableConfig tableConfig, ReadableConfig nodeConfig) {
+    private final boolean isCompiled;
+
+    private ExecNodeConfig(
+            ReadableConfig tableConfig, ReadableConfig nodeConfig, boolean 
isCompiled) {
         this.nodeConfig = nodeConfig;
         this.tableConfig = tableConfig;
+        this.isCompiled = isCompiled;
+    }
+
+    static ExecNodeConfig of(
+            TableConfig tableConfig, ReadableConfig nodeConfig, boolean 
isCompiled) {
+        return new ExecNodeConfig(tableConfig, nodeConfig, isCompiled);
+    }
+
+    public static ExecNodeConfig ofNodeConfig(ReadableConfig nodeConfig, 
boolean isCompiled) {
+        return new ExecNodeConfig(new Configuration(), nodeConfig, isCompiled);
     }
 
     @Override
@@ -65,4 +82,26 @@ public final class ExecNodeConfig implements ReadableConfig {
     public long getStateRetentionTime() {
         return get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis();
     }
+
+    /** @return Whether the {@link ExecNode} translation happens as part of a 
plan compilation. */
+    public boolean isCompiled() {
+        return isCompiled;
+    }
+
+    /** @return Whether transformations should set a UID. */
+    public boolean shouldSetUid() {
+        final UidGeneration uidGeneration = 
get(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION);
+        switch (uidGeneration) {
+            case PLAN_ONLY:
+                return isCompiled
+                        && 
!get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);
+            case ALWAYS:
+                return true;
+            case DISABLED:
+                return false;
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown UID generation strategy: " + uidGeneration);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
index 02213bd4440..beadfc4f125 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
@@ -48,15 +48,15 @@ public class ExecNodeGraphGenerator {
         this.visitedRels = new IdentityHashMap<>();
     }
 
-    public ExecNodeGraph generate(List<FlinkPhysicalRel> relNodes) {
+    public ExecNodeGraph generate(List<FlinkPhysicalRel> relNodes, boolean 
isCompiled) {
         List<ExecNode<?>> rootNodes = new ArrayList<>(relNodes.size());
         for (FlinkPhysicalRel relNode : relNodes) {
-            rootNodes.add(generate(relNode));
+            rootNodes.add(generate(relNode, isCompiled));
         }
         return new ExecNodeGraph(rootNodes);
     }
 
-    private ExecNode<?> generate(FlinkPhysicalRel rel) {
+    private ExecNode<?> generate(FlinkPhysicalRel rel, boolean isCompiled) {
         ExecNode<?> execNode = visitedRels.get(rel);
         if (execNode != null) {
             return execNode;
@@ -68,10 +68,10 @@ public class ExecNodeGraphGenerator {
 
         List<ExecNode<?>> inputNodes = new ArrayList<>();
         for (RelNode input : rel.getInputs()) {
-            inputNodes.add(generate((FlinkPhysicalRel) input));
+            inputNodes.add(generate((FlinkPhysicalRel) input, isCompiled));
         }
 
-        execNode = rel.translateToExecNode();
+        execNode = rel.translateToExecNode(isCompiled);
         // connects the input nodes
         List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size());
         for (ExecNode<?> inputNode : inputNodes) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
index ccf4a4fad6d..74899c5b50e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
@@ -36,7 +36,7 @@ public interface ExecNodeTranslator<T> {
      *
      * <p>NOTE: This method should return same translate result if called 
multiple times.
      *
-     * @param planner The {@link Planner} of the translated Table.
+     * @param planner The {@link Planner} of the translated graph.
      */
     Transformation<T> translateToPlan(Planner planner);
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index d961b5a4c96..00e0560ac80 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -210,7 +210,7 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
     }
 
     protected Transformation<RowData> translateOrder(
-            Transformation<RowData> inputTransform, RowType inputRowType, 
ReadableConfig config) {
+            Transformation<RowData> inputTransform, RowType inputRowType, 
ExecNodeConfig config) {
         return inputTransform;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 0de1483db13..8661fd9b5b6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -103,10 +103,12 @@ public abstract class CommonExecPythonCorrelate extends 
ExecNodeBase<RowData>
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final Configuration pythonConfig =
                 
CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
-        OneInputTransformation<RowData, RowData> transform =
+        final ExecNodeConfig pythonNodeConfig =
+                ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled());
+        final OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
-                        config,
+                        pythonNodeConfig,
                         planner.getFlinkContext().getClassLoader(),
                         pythonConfig);
         if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
@@ -117,7 +119,7 @@ public abstract class CommonExecPythonCorrelate extends 
ExecNodeBase<RowData>
 
     private OneInputTransformation<RowData, RowData> 
createPythonOneInputTransformation(
             Transformation<RowData> inputTransform,
-            ExecNodeConfig config,
+            ExecNodeConfig pythonNodeConfig,
             ClassLoader classLoader,
             Configuration pythonConfig) {
         Tuple2<int[], PythonFunctionInfo> extractResult = 
extractPythonTableFunctionInfo();
@@ -129,7 +131,7 @@ public abstract class CommonExecPythonCorrelate extends 
ExecNodeBase<RowData>
                 InternalTypeInfo.of((RowType) getOutputType());
         OneInputStreamOperator<RowData, RowData> pythonOperator =
                 getPythonTableFunctionOperator(
-                        config,
+                        pythonNodeConfig,
                         classLoader,
                         pythonConfig,
                         pythonOperatorInputRowType,
@@ -138,7 +140,7 @@ public abstract class CommonExecPythonCorrelate extends 
ExecNodeBase<RowData>
                         pythonUdtfInputOffsets);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationMeta(PYTHON_CORRELATE_TRANSFORMATION, 
pythonConfig),
+                createTransformationMeta(PYTHON_CORRELATE_TRANSFORMATION, 
pythonNodeConfig),
                 pythonOperator,
                 pythonOperatorOutputRowType,
                 inputTransform.getParallelism());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 6a541917461..5831bd3ed76 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
@@ -135,7 +136,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     @SuppressWarnings("unchecked")
     protected Transformation<Object> createSinkTransformation(
             StreamExecutionEnvironment streamExecEnv,
-            ReadableConfig config,
+            ExecNodeConfig config,
             ClassLoader classLoader,
             Transformation<RowData> inputTransform,
             DynamicTableSink tableSink,
@@ -210,7 +211,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
      */
     private Transformation<RowData> applyConstraintValidations(
             Transformation<RowData> inputTransform,
-            ReadableConfig config,
+            ExecNodeConfig config,
             RowType physicalRowType) {
         final ConstraintEnforcer.Builder validatorBuilder = 
ConstraintEnforcer.newBuilder();
         final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
@@ -355,7 +356,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
      * messages.
      */
     private Transformation<RowData> applyKeyBy(
-            ReadableConfig config,
+            ExecNodeConfig config,
             ClassLoader classLoader,
             Transformation<RowData> inputTransform,
             int[] primaryKeys,
@@ -399,7 +400,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             Transformation<RowData> inputTransform,
             int[] primaryKeys,
             int sinkParallelism,
-            ReadableConfig config,
+            ExecNodeConfig config,
             ClassLoader classLoader,
             RowType physicalRowType) {
         GeneratedRecordEqualiser equaliser =
@@ -444,7 +445,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             SinkRuntimeProvider runtimeProvider,
             int rowtimeFieldIndex,
             int sinkParallelism,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         TransformationMetadata sinkMeta = 
createTransformationMeta(SINK_TRANSFORMATION, config);
         if (runtimeProvider instanceof DataStreamSinkProvider) {
             Transformation<RowData> sinkTransformation =
@@ -528,10 +529,9 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         }
     }
 
-    private ProviderContext createProviderContext(ReadableConfig config) {
+    private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
-            if (this instanceof StreamExecNode
-                    && 
!config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+            if (this instanceof StreamExecNode && config.shouldSetUid()) {
                 return Optional.of(createTransformationUid(name));
             }
             return Optional.empty();
@@ -566,7 +566,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             Transformation<RowData> inputTransform,
             int rowtimeFieldIndex,
             int sinkParallelism,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         // Don't apply the transformation/operator if there is no 
rowtimeFieldIndex
         if (rowtimeFieldIndex == -1) {
             return inputTransform;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index 97d5fd86455..baebd1af435 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.InputFormatProvider;
@@ -156,10 +155,9 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
         }
     }
 
-    private ProviderContext createProviderContext(ReadableConfig config) {
+    private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
-            if (this instanceof StreamExecNode
-                    && 
!config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+            if (this instanceof StreamExecNode && config.shouldSetUid()) {
                 return Optional.of(createTransformationUid(name));
             }
             return Optional.empty();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 13c36e40029..dffd134e97a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.TreeNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.InjectableValues;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
@@ -112,7 +113,8 @@ public class JsonSerdeUtil {
     public static ObjectReader createObjectReader(SerdeContext serdeContext) {
         return OBJECT_MAPPER_INSTANCE
                 .reader()
-                .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
+                .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext)
+                .with(defaultInjectedValues());
     }
 
     public static ObjectWriter createObjectWriter(SerdeContext serdeContext) {
@@ -121,6 +123,10 @@ public class JsonSerdeUtil {
                 .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
     }
 
+    private static InjectableValues defaultInjectedValues() {
+        return new InjectableValues.Std().addValue("isDeserialize", true);
+    }
+
     private static Module createFlinkTableJacksonModule() {
         final SimpleModule module = new SimpleModule("Flink table module");
         ExecNodeMetadataUtil.execNodes()
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index be130e6e4e9..347203641e6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -342,7 +342,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
             IntervalJoinFunction joinFunction,
             JoinSpec joinSpec,
             IntervalJoinSpec.WindowBounds windowBounds,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         InternalTypeInfo<RowData> leftTypeInfo =
                 (InternalTypeInfo<RowData>) leftInputTransform.getOutputType();
         InternalTypeInfo<RowData> rightTypeInfo =
@@ -372,7 +372,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
             IntervalJoinFunction joinFunction,
             JoinSpec joinSpec,
             IntervalJoinSpec.WindowBounds windowBounds,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
 
         InternalTypeInfo<RowData> leftTypeInfo =
                 (InternalTypeInfo<RowData>) leftInputTransform.getOutputType();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 6bc018528d8..e774193f762 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
@@ -120,7 +121,7 @@ public class StreamExecMatch extends CommonExecMatch
 
     @Override
     public Transformation<RowData> translateOrder(
-            Transformation<RowData> inputTransform, RowType inputRowType, 
ReadableConfig config) {
+            Transformation<RowData> inputTransform, RowType inputRowType, 
ExecNodeConfig config) {
         SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
         int timeOrderFieldIdx = timeOrderField.getFieldIndex();
         LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
index df725ff3d85..8aa55962285 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -257,8 +257,10 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
                 generateWindowAssignerAndTrigger();
         WindowAssigner<?> windowAssigner = windowAssignerAndTrigger.f0;
         Trigger<?> trigger = windowAssignerAndTrigger.f1;
-        Configuration pythonConfig =
+        final Configuration pythonConfig =
                 
CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+        final ExecNodeConfig pythonNodeConfig =
+                ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled());
         boolean isGeneralPythonUDAF =
                 Arrays.stream(aggCalls)
                         .anyMatch(x -> PythonUtil.isPythonAggregate(x, 
PythonFunctionKind.GENERAL));
@@ -286,6 +288,7 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
                             aggInfoList,
                             emitStrategy.getAllowLateness(),
                             pythonConfig,
+                            pythonNodeConfig,
                             shiftTimeZone);
         } else {
             transform =
@@ -298,7 +301,7 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
                             trigger,
                             emitStrategy.getAllowLateness(),
                             pythonConfig,
-                            config,
+                            pythonNodeConfig,
                             planner.getFlinkContext().getClassLoader(),
                             shiftTimeZone);
         }
@@ -431,7 +434,8 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
                     WindowAssigner<?> windowAssigner,
                     AggregateInfoList aggInfoList,
                     long allowance,
-                    Configuration config,
+                    Configuration pythonConfig,
+                    ExecNodeConfig pythonNodeConfig,
                     ZoneId shiftTimeZone) {
         final int inputCountIndex = aggInfoList.getIndexOfCountStar();
         final boolean countStarInserted = aggInfoList.countStarInserted();
@@ -441,7 +445,7 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
         DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1;
         OneInputStreamOperator<RowData, RowData> pythonOperator =
                 getGeneralPythonStreamGroupWindowAggregateFunctionOperator(
-                        config,
+                        pythonConfig,
                         inputRowType,
                         outputRowType,
                         windowAssigner,
@@ -456,7 +460,8 @@ public class StreamExecPythonGroupWindowAggregate extends 
StreamExecAggregateBas
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                
createTransformationMeta(PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION, config),
+                createTransformationMeta(
+                        PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION, 
pythonNodeConfig),
                 pythonOperator,
                 InternalTypeInfo.of(outputRowType),
                 inputTransform.getParallelism());
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index d0ca7bba606..8fb8f80d37d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -193,7 +193,7 @@ abstract class PlannerBase(
 
     val relNodes = modifyOperations.map(translateToRel)
     val optimizedRelNodes = optimize(relNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes)
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = 
false)
     val transformations = translateToPlan(execGraph)
     afterTranslation()
     transformations
@@ -329,7 +329,9 @@ abstract class PlannerBase(
    * transforms the graph based on the given processors.
    */
   @VisibleForTesting
-  private[flink] def translateToExecNodeGraph(optimizedRelNodes: 
Seq[RelNode]): ExecNodeGraph = {
+  private[flink] def translateToExecNodeGraph(
+      optimizedRelNodes: Seq[RelNode],
+      isCompiled: Boolean): ExecNodeGraph = {
     val nonPhysicalRel = 
optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel])
     if (nonPhysicalRel.nonEmpty) {
       throw new TableException(
@@ -341,7 +343,8 @@ abstract class PlannerBase(
 
     // convert FlinkPhysicalRel DAG to ExecNodeGraph
     val generator = new ExecNodeGraphGenerator()
-    val execGraph = 
generator.generate(optimizedRelNodes.map(_.asInstanceOf[FlinkPhysicalRel]))
+    val execGraph =
+      
generator.generate(optimizedRelNodes.map(_.asInstanceOf[FlinkPhysicalRel]), 
isCompiled)
 
     // process the graph
     val context = new ProcessorContext(this)
@@ -522,8 +525,7 @@ abstract class PlannerBase(
       case o => throw new TableException(s"Unsupported operation: 
${o.getClass.getCanonicalName}")
     }
     val optimizedRelNodes = optimize(sinkRelNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes)
-
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = 
false)
     val transformations = translateToPlan(execGraph)
     afterTranslation()
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 2431d9c45e8..1cae6cd8b3a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -179,7 +179,7 @@ class StreamPlanner(
     beforeTranslation()
     val relNodes = modifyOperations.map(translateToRel)
     val optimizedRelNodes = optimize(relNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes)
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = 
true)
     afterTranslation()
 
     new ExecNodeGraphInternalPlan(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
index 3857bb4d783..7a8d4c5c52e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
@@ -39,6 +39,22 @@ trait FlinkPhysicalRel extends FlinkRelNode {
    */
   def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = None
 
+  /**
+   * Translate this physical RelNode into an [[ExecNode]].
+   *
+   * NOTE: This method only needs to create the corresponding ExecNode, the 
connection to its
+   * input/output nodes will be done by ExecGraphGenerator. Because some 
physical rels need not be
+   * translated to a real ExecNode, such as Exchange will be translated to 
edge in the future.
+   *
+   * @param isCompiled
+   *   Whether the translation happens as part of a plan compilation.
+   */
+  def translateToExecNode(isCompiled: Boolean): ExecNode[_] = {
+    val execNode = translateToExecNode()
+    execNode.setCompiled(isCompiled)
+    execNode
+  }
+
   /**
    * Translate this physical RelNode into an [[ExecNode]].
    *
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
index d9fcf846b99..2382b44c659 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.api.internal;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.ExecNodeGraphInternalPlan;
 
 import java.util.List;
@@ -40,7 +39,6 @@ public class CompiledPlanUtils {
     public static List<Transformation<?>> toTransformations(
             TableEnvironment env, CompiledPlan compiledPlan) {
         final ExecNodeGraphInternalPlan internalPlan = unwrap(compiledPlan);
-        return ((PlannerBase) ((TableEnvironmentImpl) env).getPlanner())
-                .translateToPlan(internalPlan.getExecNodeGraph());
+        return ((TableEnvironmentImpl) 
env).getPlanner().translatePlan(internalPlan);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
index 6dfe26b54a5..b591cca3b43 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/LongAdaptiveHashJoinGeneratorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.codegen;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
@@ -88,7 +87,7 @@ public class LongAdaptiveHashJoinGeneratorTest extends 
Int2AdaptiveHashJoinOpera
             sortMergeJoinFunction =
                     SorMergeJoinOperatorUtil.getSortMergeJoinFunction(
                             Thread.currentThread().getContextClassLoader(),
-                            new ExecNodeConfig(TableConfig.getDefault(), new 
Configuration()),
+                            ExecNodeConfig.ofNodeConfig(new Configuration(), 
false),
                             flinkJoinType,
                             buildType,
                             probeType,
@@ -103,7 +102,7 @@ public class LongAdaptiveHashJoinGeneratorTest extends 
Int2AdaptiveHashJoinOpera
             sortMergeJoinFunction =
                     SorMergeJoinOperatorUtil.getSortMergeJoinFunction(
                             Thread.currentThread().getContextClassLoader(),
-                            new ExecNodeConfig(TableConfig.getDefault(), new 
Configuration()),
+                            ExecNodeConfig.ofNodeConfig(new Configuration(), 
false),
                             flinkJoinType,
                             probeType,
                             buildType,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
index 7b8cfe39ddc..0f06d0877eb 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
@@ -106,6 +106,11 @@ public class TestingBatchExecNode implements 
BatchExecNode<RowData> {
         visitor.visit(this);
     }
 
+    @Override
+    public void setCompiled(boolean isCompiled) {
+        throw new TableException("Unsupported operation.");
+    }
+
     @Override
     public String toString() {
         return description;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index b0667b14ff9..20cf5080f90 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -26,12 +26,12 @@ import 
org.apache.flink.streaming.api.transformations.WithBoundedness;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableDescriptor;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
 
 import org.junit.jupiter.api.Test;
@@ -39,8 +39,15 @@ import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
 
 import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.ALWAYS;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.DISABLED;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.PLAN_ONLY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.InstanceOfAssertFactories.type;
 
@@ -108,10 +115,28 @@ class TransformationsTest {
     }
 
     @Test
-    public void testLegacyUid() {
-        final TableEnvironment env =
-                
TableEnvironment.create(EnvironmentSettings.inStreamingMode().getConfiguration());
-        
env.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS,
 true);
+    public void testUidGeneration() {
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY), true, 
false);
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, ALWAYS), true, true);
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, DISABLED), false, 
false);
+        checkUids(
+                c -> {
+                    c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY);
+                    c.set(TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+                },
+                false,
+                false);
+    }
+
+    private static void checkUids(
+            Consumer<TableConfig> config,
+            boolean expectUidWithCompilation,
+            boolean expectUidWithoutCompilation) {
+        final StreamTableEnvironment env =
+                StreamTableEnvironment.create(
+                        StreamExecutionEnvironment.getExecutionEnvironment(),
+                        
EnvironmentSettings.newInstance().inStreamingMode().build());
+        config.accept(env.getConfig());
 
         env.createTemporaryTable(
                 "source_table",
@@ -122,24 +147,49 @@ class TransformationsTest {
         env.createTemporaryTable(
                 "sink_table", 
TableDescriptor.forConnector("values").schema(dummySchema()).build());
 
-        final CompiledPlan compiledPlan =
-                env.from("source_table")
-                        .select($("i").abs())
-                        .insertInto("sink_table")
-                        .compilePlan();
-
-        // There should be 3 transformations in this list: sink -> calc -> 
source
-        final List<Transformation<?>> transformations =
-                CompiledPlanUtils.toTransformations(env, compiledPlan)
-                        .get(0)
-                        .getTransitivePredecessors();
-        assertThat(transformations).hasSize(3);
-
-        // As the sink and source might set the uid, we only check the calc 
transformation.
-        assertThat(transformations)
-                .element(1, type(Transformation.class))
-                .extracting(Transformation::getUid)
-                .isNull();
+        // There should be 3 transformations: sink -> calc -> source
+        final Table table = env.from("source_table").select($("i").abs());
+
+        // Uses in-memory ExecNodes
+        final CompiledPlan memoryPlan = 
table.insertInto("sink_table").compilePlan();
+        final List<String> memoryUids =
+                CompiledPlanUtils.toTransformations(env, memoryPlan).get(0)
+                        .getTransitivePredecessors().stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(memoryUids).hasSize(3);
+        if (expectUidWithCompilation) {
+            assertThat(memoryUids).allSatisfy(u -> assertThat(u).isNotNull());
+        } else {
+            assertThat(memoryUids).allSatisfy(u -> assertThat(u).isNull());
+        }
+
+        // Uses deserialized ExecNodes
+        final String jsonPlan = 
table.insertInto("sink_table").compilePlan().asJsonString();
+        final List<String> jsonUids =
+                CompiledPlanUtils.toTransformations(
+                                env, 
env.loadPlan(PlanReference.fromJsonString(jsonPlan)))
+                        .get(0).getTransitivePredecessors().stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(jsonUids).hasSize(3);
+        if (expectUidWithCompilation) {
+            assertThat(jsonUids).allSatisfy(u -> assertThat(u).isNotNull());
+        } else {
+            assertThat(jsonUids).allSatisfy(u -> assertThat(u).isNull());
+        }
+
+        final List<String> inlineUids =
+                
env.toChangelogStream(table).getTransformation().getTransitivePredecessors()
+                        .stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(inlineUids).hasSize(3);
+        if (expectUidWithoutCompilation) {
+            assertThat(inlineUids).allSatisfy(u -> assertThat(u).isNotNull());
+        } else {
+            assertThat(inlineUids).allSatisfy(u -> assertThat(u).isNull());
+        }
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
index 078a7f5ef68..08679fd4a07 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
@@ -105,7 +105,8 @@ public class MultipleInputNodeCreationProcessorTest extends 
TableTestBase {
         RelNode relNode = TableTestUtil.toRelNode(table);
         FlinkPhysicalRel optimizedRel = (FlinkPhysicalRel) 
util.getPlanner().optimize(relNode);
         ExecNodeGraphGenerator generator = new ExecNodeGraphGenerator();
-        ExecNodeGraph execGraph = 
generator.generate(Collections.singletonList(optimizedRel));
+        ExecNodeGraph execGraph =
+                generator.generate(Collections.singletonList(optimizedRel), 
false);
         ExecNode<?> execNode = execGraph.getRootNodes().get(0);
         while (!execNode.getInputEdges().isEmpty()) {
             execNode = execNode.getInputEdges().get(0).getSource();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
index 2a42585a3c4..8d1c182d3ea 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
@@ -72,7 +72,8 @@ public class InternalConfigOptionsTest extends TableTestBase {
                 tEnv.sqlQuery("SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, 
CURRENT_TIMESTAMP");
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNodeGraph execNodeGraph =
-                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)));
+                planner.translateToExecNodeGraph(
+                        toScala(Collections.singletonList(relNode)), false);
         // PlannerBase#translateToExecNodeGraph will set internal temporal 
configurations and
         // cleanup them after translate finished
         List<Transformation<?>> transformation = 
planner.translateToPlan(execNodeGraph);
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 35499ba9439..cf851cb5b9a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, 
TypeInformation}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.ExecutionOptions
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, 
ObjectMapper}
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
 import org.apache.flink.streaming.api.{environment, TimeCharacteristic}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, 
StreamExecutionEnvironment}
@@ -985,7 +985,7 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
 
     // build optimized exec plan if `expectedPlans` contains OPT_EXEC
     val optimizedExecPlan = if (expectedPlans.contains(PlanKind.OPT_EXEC)) {
-      val execGraph = getPlanner.translateToExecNodeGraph(optimizedRels)
+      val execGraph = getPlanner.translateToExecNodeGraph(optimizedRels, 
isCompiled = false)
       System.lineSeparator + ExecNodePlanDumper.dagToString(execGraph)
     } else {
       ""

Reply via email to