发件人: Henry meng (孟令平)
发送时间: 2023年4月13日 15:27
收件人: 'user-zh-i...@flink.apache.org' <user-zh-i...@flink.apache.org>
主题:


public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//    env.enableCheckpointing(60000);
      SingleOutputStreamOperator<String> dataStream = env.addSource(new 
FlinkRocketMQConsumer("10.164.15.31:9876","flink_data"))
            .uid("source-id");
      dataStream .print("**********");
      DataStream<InverterPo> datStream =dataStream.process(new MyFunction());
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      datStream.print("=========>");
      Table inputTable = 
tableEnv.fromDataStream(datStream,"devId,identifier,dataValue,dataTime,tenantId");//"devId,identifier,dataValue,dataTime,tenantId"
      tableEnv.createTemporaryView("InputTable", inputTable);
      Table resultTable = tableEnv.sqlQuery(
            "select devId,'DAY_POWER_GENERATION' as identify, 
(MAX(dataValue)-Min(dataValue)) value1,MAX(dataTime) datatime, " +
                  "tenantId from InputTable where 
identifier='TOTAL_POWER_GENERATION' group by devId,tenantId");
//    resultTable.print();
      DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
      resultStream.print("------------>");
//    tableEnv.createTemporaryView("resultTable", resultTable);

      tableEnv.executeSql("CREATE TABLE print_table (`DEV_ID` 
BIGINT,`IDENTIFIER` String," +
            "`DATA_VALUE` Decimal(16,4),`DATA_TIME` TIMESTAMP,`TENANT_ID` 
BIGINT) " +
            "WITH ('connector' = 'print')");
//    tableEnv.executeSql("insert into print_table select 
devId,identifier,dataValue,dataTime,tenantId from resultTable");
      inputTable.executeInsert("print_table");
      env.execute("FlinkRocketMQConsumerDemo");}

如上代码所示,当我添加上env.execute()方法后,发现print_table不打印数据了这是什么原因?
StreamExecutionEnvironment.execute()和StreamTableEnvironment.executeSql()同时执行有什么问题吗?

回复