hi sunfulin,

1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]



对于 “No operators defined in streaming topology.”,如果使用
TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
提交作业,就会出现前面的错误。

对于
“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Leonard Xu <[email protected]> 于2020年7月12日周日 下午1:48写道:

> HI, fulin
>
> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
>  TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法,
> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert
> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”),
> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job,
> 这应该不是用户需要的。
> 具体使用根据你的需要来使用。
>
>
> Best,
> Leonard Xu
>
>
> 在 2020年7月11日,22:24,sunfulin <[email protected]> 写道:
>
> statementset.execute
>
>
>

回复