发件人: Henry meng (孟令平)
发送时间: 2023年4月13日 15:27
收件人: 'user-zh-i...@flink.apache.org'
主题:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(6);
SingleOutputStreamOperator dataStream = env.addSource(new
FlinkRocketMQConsumer("10.164.15.31:9876","flink_data"))
.uid("source-id");
dataStream .print("**");
DataStream datStream =dataStream.process(new MyFunction());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
datStream.print("=>");
Table inputTable =
tableEnv.fromDataStream(datStream,"devId,identifier,dataValue,dataTime,tenantId");//"devId,identifier,dataValue,dataTime,tenantId"
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"select devId,'DAY_POWER_GENERATION' as identify,
(MAX(dataValue)-Min(dataValue)) value1,MAX(dataTime) datatime, " +
"tenantId from InputTable where
identifier='TOTAL_POWER_GENERATION' group by devId,tenantId");
//resultTable.print();
DataStream resultStream = tableEnv.toChangelogStream(resultTable);
resultStream.print(">");
//tableEnv.createTemporaryView("resultTable", resultTable);
tableEnv.executeSql("CREATE TABLE print_table (`DEV_ID`
BIGINT,`IDENTIFIER` String," +
"`DATA_VALUE` Decimal(16,4),`DATA_TIME` TIMESTAMP,`TENANT_ID`
BIGINT) " +
"WITH ('connector' = 'print')");
//tableEnv.executeSql("insert into print_table select
devId,identifier,dataValue,dataTime,tenantId from resultTable");
inputTable.executeInsert("print_table");
env.execute("FlinkRocketMQConsumerDemo");}
如上代码所示,当我添加上env.execute()方法后,发现print_table不打印数据了这是什么原因?
StreamExecutionEnvironment.execute()和StreamTableEnvironment.executeSql()同时执行有什么问题吗?