发件人: Henry meng (孟令平) 发送时间: 2023年4月13日 15:27 收件人: 'user-zh-i...@flink.apache.org' <user-zh-i...@flink.apache.org> 主题:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.enableCheckpointing(60000); SingleOutputStreamOperator<String> dataStream = env.addSource(new FlinkRocketMQConsumer("10.164.15.31:9876","flink_data")) .uid("source-id"); dataStream .print("**********"); DataStream<InverterPo> datStream =dataStream.process(new MyFunction()); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); datStream.print("=========>"); Table inputTable = tableEnv.fromDataStream(datStream,"devId,identifier,dataValue,dataTime,tenantId");//"devId,identifier,dataValue,dataTime,tenantId" tableEnv.createTemporaryView("InputTable", inputTable); Table resultTable = tableEnv.sqlQuery( "select devId,'DAY_POWER_GENERATION' as identify, (MAX(dataValue)-Min(dataValue)) value1,MAX(dataTime) datatime, " + "tenantId from InputTable where identifier='TOTAL_POWER_GENERATION' group by devId,tenantId"); // resultTable.print(); DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable); resultStream.print("------------>"); // tableEnv.createTemporaryView("resultTable", resultTable); tableEnv.executeSql("CREATE TABLE print_table (`DEV_ID` BIGINT,`IDENTIFIER` String," + "`DATA_VALUE` Decimal(16,4),`DATA_TIME` TIMESTAMP,`TENANT_ID` BIGINT) " + "WITH ('connector' = 'print')"); // tableEnv.executeSql("insert into print_table select devId,identifier,dataValue,dataTime,tenantId from resultTable"); inputTable.executeInsert("print_table"); env.execute("FlinkRocketMQConsumerDemo");} 如上代码所示,当我添加上env.execute()方法后,发现print_table不打印数据了这是什么原因? StreamExecutionEnvironment.execute()和StreamTableEnvironment.executeSql()同时执行有什么问题吗?