Hi, > 因为听说executeSql会提交任务,所以把最后一句execute给注销了。
> val result: Table = tEnv.sqlQuery(query)
> tEnv.toRetractStream[Row](result).print()
> // tEnv.execute("Flink SQL DDL")
DataStream程序的执行和Table/SQL程序的执行是解耦的,已经通过 tEnv.toRetractStream 转成 DataStrean
的程序后,需要调用 bsEnv.execute("test")
如果需要直接用SQL,可以直接:
tEnv.executeSql(query).print();
转换成datastream后应该类似这样
val result: Table = tEnv.sqlQuery(query)
tEnv.toRetractStream[Row](result).print()
bsEnv.execute("test”)
祝好,
Leonard
