[ https://issues.apache.org/jira/browse/FLINK-31884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717446#comment-17717446 ]
Godfrey He commented on FLINK-31884: ------------------------------------ Fixed in master: 3664609c7622ccae80e36e85099a1b79b5935fe9 > Upgrade ExecNode to new version causes the old serialized plan failed to pass > Json SerDe round trip > --------------------------------------------------------------------------------------------------- > > Key: FLINK-31884 > URL: https://issues.apache.org/jira/browse/FLINK-31884 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.18.0 > Reporter: Jane Chan > Assignee: Jane Chan > Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > h4. How to Reproduce > Firstly, add a test to dump the compiled plan JSON. > {code:java} > @Test > public void debug() { > tableEnv.executeSql("create table foo (f0 int, f1 string) with > ('connector' = 'datagen')"); > tableEnv.executeSql("create table bar (f0 int, f1 string) with > ('connector' = 'print')"); > tableEnv.compilePlanSql("insert into bar select * from foo") > .writeToFile(new File("/path/to/debug.json")); > } > {code} > The JSON context is as follows > {code:json} > { > "flinkVersion" : "1.18", > "nodes" : [ { > "id" : 1, > "type" : "stream-exec-table-source-scan_1", > "scanTableSource" : { > "table" : { > "identifier" : "`default_catalog`.`default_database`.`foo`", > "resolvedTable" : { > "schema" : { > "columns" : [ { > "name" : "f0", > "dataType" : "INT" > }, { > "name" : "f1", > "dataType" : "VARCHAR(2147483647)" > } ], > "watermarkSpecs" : [ ] > }, > "partitionKeys" : [ ], > "options" : { > "connector" : "datagen" > } > } > } > }, > "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", > "description" : "TableSourceScan(table=[[default_catalog, > default_database, foo]], fields=[f0, f1])", > "inputProperties" : [ ] > }, { > "id" : 2, > "type" : "stream-exec-sink_1", > "configuration" : { > "table.exec.sink.keyed-shuffle" : "AUTO", > "table.exec.sink.not-null-enforcer" : "ERROR", > "table.exec.sink.type-length-enforcer" : "IGNORE", > "table.exec.sink.upsert-materialize" : "AUTO" > }, > "dynamicTableSink" : { > "table" : { > "identifier" : "`default_catalog`.`default_database`.`bar`", > "resolvedTable" : { > "schema" : { > "columns" : [ { > "name" : "f0", > "dataType" : "INT" > }, { > "name" : "f1", > "dataType" : "VARCHAR(2147483647)" > } ], > "watermarkSpecs" : [ ] > }, > "partitionKeys" : [ ], > "options" : { > "connector" : "print" > } > } > } > }, > "inputChangelogMode" : [ "INSERT" ], > "inputProperties" : [ { > "requiredDistribution" : { > "type" : "UNKNOWN" > }, > "damBehavior" : "PIPELINED", > "priority" : 0 > } ], > "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", > "description" : "Sink(table=[default_catalog.default_database.bar], > fields=[f0, f1])" > } ], > "edges" : [ { > "source" : 1, > "target" : 2, > "shuffle" : { > "type" : "FORWARD" > }, > "shuffleMode" : "PIPELINED" > } ] > } > {code} > Then upgrade the StreamExecSink to a new version > {code:java} > @ExecNodeMetadata( > name = "stream-exec-sink", > version = 1, > consumedOptions = { > "table.exec.sink.not-null-enforcer", > "table.exec.sink.type-length-enforcer", > "table.exec.sink.upsert-materialize", > "table.exec.sink.keyed-shuffle" > }, > producedTransformations = { > CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, > CommonExecSink.PARTITIONER_TRANSFORMATION, > CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, > CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, > CommonExecSink.SINK_TRANSFORMATION > }, > minPlanVersion = FlinkVersion.v1_15, > minStateVersion = FlinkVersion.v1_15) > @ExecNodeMetadata( > name = "stream-exec-sink", > version = 2, > consumedOptions = { > "table.exec.sink.not-null-enforcer", > "table.exec.sink.type-length-enforcer", > "table.exec.sink.upsert-materialize", > "table.exec.sink.keyed-shuffle" > }, > producedTransformations = { > CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, > CommonExecSink.PARTITIONER_TRANSFORMATION, > CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, > CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, > CommonExecSink.SINK_TRANSFORMATION > }, > minPlanVersion = FlinkVersion.v1_18, > minStateVersion = FlinkVersion.v1_15) > public class StreamExecSink extends CommonExecSink implements > StreamExecNode<Object> { > } > {code} > And then load the previous plan and print it as JSON text > {code:java} > tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString(); > {code} > The SerDe lost idempotence since the version for StreamExecSink became > version 2. > {code:json} > { > "flinkVersion" : "1.18", > "nodes" : [ { > "id" : 1, > "type" : "stream-exec-table-source-scan_1", > "scanTableSource" : { > "table" : { > "identifier" : "`default_catalog`.`default_database`.`foo`" > } > }, > "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", > "description" : "TableSourceScan(table=[[default_catalog, > default_database, foo]], fields=[f0, f1])", > "inputProperties" : [ ] > }, { > "id" : 2, > "type" : "stream-exec-sink_2", > "configuration" : { > "table.exec.sink.keyed-shuffle" : "AUTO", > "table.exec.sink.not-null-enforcer" : "ERROR", > "table.exec.sink.type-length-enforcer" : "IGNORE", > "table.exec.sink.upsert-materialize" : "AUTO" > }, > "dynamicTableSink" : { > "table" : { > "identifier" : "`default_catalog`.`default_database`.`bar`" > } > }, > "inputChangelogMode" : [ "INSERT" ], > "inputProperties" : [ { > "requiredDistribution" : { > "type" : "UNKNOWN" > }, > "damBehavior" : "PIPELINED", > "priority" : 0 > } ], > "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", > "description" : "Sink(table=[default_catalog.default_database.bar], > fields=[f0, f1])" > } ], > "edges" : [ { > "source" : 1, > "target" : 2, > "shuffle" : { > "type" : "FORWARD" > }, > "shuffleMode" : "PIPELINED" > } ] > } > {code} > h4. Root Cause > ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version > for SerDe. As a result, although the deserialized CompilePlan object is > correct, #printAsJson will create a new context with the newest version. > > h4. Suggested Fix > If the member variable `isCompiled` is true, then #getContextFromAnnotation > should return the context which reads from the JSON plan instead of > instantiating a new one. > -- This message was sent by Atlassian Jira (v8.20.10#820010)