hi sunfulin, 1.11 对 StreamTableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 的执行方式有所调整, 简单概述为: 1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; 2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; 3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业 (异步提交作业),不需要再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
详细可以参考 [1] [2] 对于 “No operators defined in streaming topology.”,如果使用 TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用 StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute() 提交作业,就会出现前面的错误。 对于 “是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset Best, Godfrey Leonard Xu <[email protected]> 于2020年7月12日周日 下午1:48写道: > HI, fulin > > 能大致贴下代码吗?能复现异常即可。简单说下这两个方法, > TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 > DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink > job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) > 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 > Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, > 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert > tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), > TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, > 这应该不是用户需要的。 > 具体使用根据你的需要来使用。 > > > Best, > Leonard Xu > > > 在 2020年7月11日,22:24,sunfulin <[email protected]> 写道: > > statementset.execute > > >
