Hi: My application use flink sql, I want to add new sql to the application,
For example first version is DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic) .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30) .returns(TypeInformation.of(AggregatedOrderItems.class)); tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs")); tableEnv.registerFunction("group_concat", new GroupConcatFunction()); Table resultTable = tableEnv.sqlQuery(sql1); tableEnv.toAppendStream(resultTable, Row.class, qConfig) .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30) .filter(new FilterFunction<DetectionResult>() { @Override public boolean filter(DetectionResult value) throws Exception { return (value.getViolationCount() >= 5); } }).addSink(new DetectionResultMySqlSink()); Then second version, I add new sql Table resultTable2 = tableEnv.sqlQuery(sql2); tableEnv.toAppendStream(resultTable2, Row.class, qConfig) .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30) .filter(new FilterFunction<DetectionResult>() { @Override public boolean filter(DetectionResult value) throws Exception { return (value.getViolationCount() >= 5); } }).addSink(new DetectionResultMySqlSink()); After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql) Regards James