[ 
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-32374:
--------------------------------

    Assignee: Jane Chan

> ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
> overwriting
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-32374
>                 URL: https://issues.apache.org/jira/browse/FLINK-32374
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
>            Reporter: Jane Chan
>            Assignee: Jane Chan
>            Priority: Major
>             Fix For: 1.18.0
>
>
> If the existing JSON plan is not truncated when overwriting, and the newly 
> generated JSON plan contents are shorter than the previous JSON plan content, 
> the plan be an invalid JSON.
> h4. How to reproduce
> {code:sql}
> Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 
> 'blackhole');
> [INFO] Execute statement succeed.
> Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) 
> with ('connector' = 'datagen');
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
> [INFO] Execute statement succeed.
> Flink SQL> set 'table.plan.force-recompile' = 'true';
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select * from (values (2, 'bye')) T (id, message);
> [INFO] Execute statement succeed.
> {code}
> cat -n debug.json, and check L#67
> {code:json}
>      1        {
>      2          "flinkVersion" : "1.17",
>      3          "nodes" : [ {
>      4            "id" : 15,
>      5            "type" : "stream-exec-values_1",
>      6            "tuples" : [ [ {
>      7              "kind" : "LITERAL",
>      8              "value" : "2",
>      9              "type" : "INT NOT NULL"
>     10            }, {
>     11              "kind" : "LITERAL",
>     12              "value" : "bye",
>     13              "type" : "CHAR(3) NOT NULL"
>     14            } ] ],
>     15            "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
>     16            "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
>     17            "inputProperties" : [ ]
>     18          }, {
>     19            "id" : 16,
>     20            "type" : "stream-exec-sink_1",
>     21            "configuration" : {
>     22              "table.exec.sink.keyed-shuffle" : "AUTO",
>     23              "table.exec.sink.not-null-enforcer" : "ERROR",
>     24              "table.exec.sink.type-length-enforcer" : "IGNORE",
>     25              "table.exec.sink.upsert-materialize" : "AUTO"
>     26            },
>     27            "dynamicTableSink" : {
>     28              "table" : {
>     29                "identifier" : 
> "`default_catalog`.`default_database`.`debug_sink`",
>     30                "resolvedTable" : {
>     31                  "schema" : {
>     32                    "columns" : [ {
>     33                      "name" : "f0",
>     34                      "dataType" : "INT"
>     35                    }, {
>     36                      "name" : "f1",
>     37                      "dataType" : "VARCHAR(2147483647)"
>     38                    } ],
>     39                    "watermarkSpecs" : [ ]
>     40                  },
>     41                  "partitionKeys" : [ ],
>     42                  "options" : {
>     43                    "connector" : "blackhole"
>     44                  }
>     45                }
>     46              }
>     47            },
>     48            "inputChangelogMode" : [ "INSERT" ],
>     49            "inputProperties" : [ {
>     50              "requiredDistribution" : {
>     51                "type" : "UNKNOWN"
>     52              },
>     53              "damBehavior" : "PIPELINED",
>     54              "priority" : 0
>     55            } ],
>     56            "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
>     57            "description" : 
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, 
> message])"
>     58          } ],
>     59          "edges" : [ {
>     60            "source" : 15,
>     61            "target" : 16,
>     62            "shuffle" : {
>     63              "type" : "FORWARD"
>     64            },
>     65            "shuffleMode" : "PIPELINED"
>     66          } ]
>     67        } "$CONCAT$1",
>     68              "operands" : [ {
>     69                "kind" : "INPUT_REF",
>     70                "inputIndex" : 2,
>     71                "type" : "VARCHAR(2147483647)"
>     72              }, {
>     73                "kind" : "INPUT_REF",
>     74                "inputIndex" : 3,
>     75                "type" : "VARCHAR(2147483647)"
>     76              } ],
>     77              "type" : "VARCHAR(2147483647)"
>     78            } ],
>     79            "condition" : null,
>     80            "inputProperties" : [ {
>     81              "requiredDistribution" : {
>     82                "type" : "UNKNOWN"
>     83              },
>     84              "damBehavior" : "PIPELINED",
>     85              "priority" : 0
>     86            } ],
>     87            "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     88            "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, 
> CONCAT(f2, f3) AS f1])"
>     89          }, {
>     90            "id" : 14,
>     91            "type" : "stream-exec-sink_1",
>     92            "configuration" : {
>     93              "table.exec.sink.keyed-shuffle" : "AUTO",
>     94              "table.exec.sink.not-null-enforcer" : "ERROR",
>     95              "table.exec.sink.type-length-enforcer" : "IGNORE",
>     96              "table.exec.sink.upsert-materialize" : "AUTO"
>     97            },
>     98            "dynamicTableSink" : {
>     99              "table" : {
>    100                "identifier" : 
> "`default_catalog`.`default_database`.`debug_sink`",
>    101                "resolvedTable" : {
>    102                  "schema" : {
>    103                    "columns" : [ {
>    104                      "name" : "f0",
>    105                      "dataType" : "INT"
>    106                    }, {
>    107                      "name" : "f1",
>    108                      "dataType" : "VARCHAR(2147483647)"
>    109                    } ],
>    110                    "watermarkSpecs" : [ ]
>    111                  },
>    112                  "partitionKeys" : [ ],
>    113                  "options" : {
>    114                    "connector" : "blackhole"
>    115                  }
>    116                }
>    117              }
>    118            },
>    119            "inputChangelogMode" : [ "INSERT" ],
>    120            "inputProperties" : [ {
>    121              "requiredDistribution" : {
>    122                "type" : "UNKNOWN"
>    123              },
>    124              "damBehavior" : "PIPELINED",
>    125              "priority" : 0
>    126            } ],
>    127            "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>    128            "description" : 
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
>    129          } ],
>    130          "edges" : [ {
>    131            "source" : 12,
>    132            "target" : 13,
>    133            "shuffle" : {
>    134              "type" : "FORWARD"
>    135            },
>    136            "shuffleMode" : "PIPELINED"
>    137          }, {
>    138            "source" : 13,
>    139            "target" : 14,
>    140            "shuffle" : {
>    141              "type" : "FORWARD"
>    142            },
>    143            "shuffleMode" : "PIPELINED"
>    144          } ]
>    145        }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to