[ 
https://issues.apache.org/jira/browse/FLINK-31884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717446#comment-17717446
 ] 

Godfrey He commented on FLINK-31884:
------------------------------------

Fixed in master: 3664609c7622ccae80e36e85099a1b79b5935fe9

> Upgrade ExecNode to new version causes the old serialized plan failed to pass 
> Json SerDe round trip
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31884
>                 URL: https://issues.apache.org/jira/browse/FLINK-31884
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Jane Chan
>            Assignee: Jane Chan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.18.0
>
>
> h4. How to Reproduce
> Firstly, add a test to dump the compiled plan JSON.
> {code:java}
> @Test
> public void debug() {
>     tableEnv.executeSql("create table foo (f0 int, f1 string) with 
> ('connector' = 'datagen')");
>     tableEnv.executeSql("create table bar (f0 int, f1 string) with 
> ('connector' = 'print')");
>     tableEnv.compilePlanSql("insert into bar select * from foo")
>             .writeToFile(new File("/path/to/debug.json"));
> }
> {code}
> The JSON context is as follows
> {code:json}
> {
>   "flinkVersion" : "1.18",
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "stream-exec-table-source-scan_1",
>     "scanTableSource" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`foo`",
>         "resolvedTable" : {
>           "schema" : {
>             "columns" : [ {
>               "name" : "f0",
>               "dataType" : "INT"
>             }, {
>               "name" : "f1",
>               "dataType" : "VARCHAR(2147483647)"
>             } ],
>             "watermarkSpecs" : [ ]
>           },
>           "partitionKeys" : [ ],
>           "options" : {
>             "connector" : "datagen"
>           }
>         }
>       }
>     },
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "TableSourceScan(table=[[default_catalog, 
> default_database, foo]], fields=[f0, f1])",
>     "inputProperties" : [ ]
>   }, {
>     "id" : 2,
>     "type" : "stream-exec-sink_1",
>     "configuration" : {
>       "table.exec.sink.keyed-shuffle" : "AUTO",
>       "table.exec.sink.not-null-enforcer" : "ERROR",
>       "table.exec.sink.type-length-enforcer" : "IGNORE",
>       "table.exec.sink.upsert-materialize" : "AUTO"
>     },
>     "dynamicTableSink" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`bar`",
>         "resolvedTable" : {
>           "schema" : {
>             "columns" : [ {
>               "name" : "f0",
>               "dataType" : "INT"
>             }, {
>               "name" : "f1",
>               "dataType" : "VARCHAR(2147483647)"
>             } ],
>             "watermarkSpecs" : [ ]
>           },
>           "partitionKeys" : [ ],
>           "options" : {
>             "connector" : "print"
>           }
>         }
>       }
>     },
>     "inputChangelogMode" : [ "INSERT" ],
>     "inputProperties" : [ {
>       "requiredDistribution" : {
>         "type" : "UNKNOWN"
>       },
>       "damBehavior" : "PIPELINED",
>       "priority" : 0
>     } ],
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "Sink(table=[default_catalog.default_database.bar], 
> fields=[f0, f1])"
>   } ],
>   "edges" : [ {
>     "source" : 1,
>     "target" : 2,
>     "shuffle" : {
>       "type" : "FORWARD"
>     },
>     "shuffleMode" : "PIPELINED"
>   } ]
> }
> {code}
> Then upgrade the StreamExecSink to a new version
> {code:java}
> @ExecNodeMetadata(
>         name = "stream-exec-sink",
>         version = 1,
>         consumedOptions = {
>             "table.exec.sink.not-null-enforcer",
>             "table.exec.sink.type-length-enforcer",
>             "table.exec.sink.upsert-materialize",
>             "table.exec.sink.keyed-shuffle"
>         },
>         producedTransformations = {
>             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
>             CommonExecSink.PARTITIONER_TRANSFORMATION,
>             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
>             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
>             CommonExecSink.SINK_TRANSFORMATION
>         },
>         minPlanVersion = FlinkVersion.v1_15,
>         minStateVersion = FlinkVersion.v1_15)
> @ExecNodeMetadata(
>         name = "stream-exec-sink",
>         version = 2,
>         consumedOptions = {
>             "table.exec.sink.not-null-enforcer",
>             "table.exec.sink.type-length-enforcer",
>             "table.exec.sink.upsert-materialize",
>             "table.exec.sink.keyed-shuffle"
>         },
>         producedTransformations = {
>             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
>             CommonExecSink.PARTITIONER_TRANSFORMATION,
>             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
>             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
>             CommonExecSink.SINK_TRANSFORMATION
>         },
>         minPlanVersion = FlinkVersion.v1_18,
>         minStateVersion = FlinkVersion.v1_15)
> public class StreamExecSink extends CommonExecSink implements 
> StreamExecNode<Object> {
> }
> {code}
> And then load the previous plan and print it as JSON text
> {code:java}
> tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
> {code}
> The SerDe lost idempotence since the version for StreamExecSink became 
> version 2.
> {code:json}
> {
>   "flinkVersion" : "1.18",
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "stream-exec-table-source-scan_1",
>     "scanTableSource" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`foo`"
>       }
>     },
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "TableSourceScan(table=[[default_catalog, 
> default_database, foo]], fields=[f0, f1])",
>     "inputProperties" : [ ]
>   }, {
>     "id" : 2,
>     "type" : "stream-exec-sink_2",
>     "configuration" : {
>       "table.exec.sink.keyed-shuffle" : "AUTO",
>       "table.exec.sink.not-null-enforcer" : "ERROR",
>       "table.exec.sink.type-length-enforcer" : "IGNORE",
>       "table.exec.sink.upsert-materialize" : "AUTO"
>     },
>     "dynamicTableSink" : {
>       "table" : {
>         "identifier" : "`default_catalog`.`default_database`.`bar`"
>       }
>     },
>     "inputChangelogMode" : [ "INSERT" ],
>     "inputProperties" : [ {
>       "requiredDistribution" : {
>         "type" : "UNKNOWN"
>       },
>       "damBehavior" : "PIPELINED",
>       "priority" : 0
>     } ],
>     "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     "description" : "Sink(table=[default_catalog.default_database.bar], 
> fields=[f0, f1])"
>   } ],
>   "edges" : [ {
>     "source" : 1,
>     "target" : 2,
>     "shuffle" : {
>       "type" : "FORWARD"
>     },
>     "shuffleMode" : "PIPELINED"
>   } ]
> }
> {code}
> h4. Root Cause
> ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version 
> for SerDe. As a result, although the deserialized CompilePlan object is 
> correct, #printAsJson will create a new context with the newest version.
>  
> h4. Suggested Fix
> If the member variable `isCompiled` is true, then #getContextFromAnnotation 
> should return the context which reads from the JSON plan instead of 
> instantiating a new one.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to