EnvironmentSettings sett = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, sett);
//当程序全部使用ddl方式执行的时候 streamTableEnv.executeSql(DDLSourceSQLManager.createTableUseDDLFromKafka(); streamTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createPrintlnRetractSinkTbl("printTable")); streamTableEnv.executeSql("insert into printTable select msg,count(*) as cnt from test group by msg"); // 以下两个都报错 streamExecutionEnvironment.execute(""); streamTableEnv.execute(""); // ---------------------------------------------- //但是当我创建test表的方式改为就可以运行 Table a = tableEnv.fromDataStream(source, "topic,offset,ts,date,msg") tableEnv.createTemporaryView("test", a); streamTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createPrintlnRetractSinkTbl("printTable")); streamTableEnv.executeSql("insert into printTable select msg,count(*) as cnt from test group by msg"); streamExecutionEnvironment.execute(""); // 但是当我 以下两个同时存在也可以正常运行。 Table a = tableEnv.fromDataStream(source, "topic,offset,ts,date,msg") streamTableEnvironment.createTemporaryView("tablename随便取", a); streamTableEnvironment.executeSql(DDLSourceSQLManager.createStreamFromKafka(); 问题: 1:在DDL的程序中,必须要有DataStream才能执行? 2:streamTableEnv.executeSql() 能立马执行,不需要 streamTableEnv.execute(""),是在 batch场景?我在流场景都是需要 execute -- Sent from: http://apache-flink.147419.n8.nabble.com/