Hi:

   My application use flink sql, I want to add new sql to the application,

For example first version is

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, 
bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new 
PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, 
concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

Then second version, I add new sql

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

After restart job with savepoints, whether the original flink sql can be 
restore success? Whether the flink will assign a new UID to original sql 
operator? (I will not change the original sql)

Regards

James

Reply via email to