Hi, 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 请问有什么解决的方法吗?
在 2020-07-08 16:07:17,"Jingsong Li" <jingsongl...@gmail.com> 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" >并没有真正的物理节点。你不用再调用了。 > >Best, >Jingsong > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wander...@163.com> wrote: > >> >> >> >> 代码结构改成这样的了: >> >> >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, >> blinkEnvSettings) >> >> >> >> >> >> streamExecutionEnv.execute("from kafka sink hbase") >> >> >> >> >> 还是报一样的错 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-08 15:40:41,"夏帅" <jkill...@dingtalk.com.INVALID> 写道: >> >你好, >> >可以看看你的代码结构是不是以下这种 >> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> > val bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build >> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >> > ...... >> > tableEnv.execute("") >> >如果是的话,可以尝试使用bsEnv.execute("") >> >1.11对于两者的execute代码实现有改动 >> > >> > >> >------------------------------------------------------------------ >> >发件人:Zhou Zach <wander...@163.com> >> >发送时间:2020年7月8日(星期三) 15:30 >> >收件人:Flink user-zh mailing list <user-zh@flink.apache.org> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology >> > >> >代码在flink >> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: >> >Exception in thread "main" java.lang.IllegalStateException: No operators >> defined in streaming topology. Cannot generate StreamGraph. >> >at >> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) >> >at >> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) >> >at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) >> > >> > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 >> > >> > >> > >> > >> >query: >> >streamTableEnv.executeSql( >> > """ >> > | >> > |CREATE TABLE `user` ( >> > | uid BIGINT, >> > | sex VARCHAR, >> > | age INT, >> > | created_time TIMESTAMP(3), >> > | WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> > |) WITH ( >> > | 'connector.type' = 'kafka', >> > | 'connector.version' = 'universal', >> > | -- 'connector.topic' = 'user', >> > | 'connector.topic' = 'user_long', >> > | 'connector.startup-mode' = 'latest-offset', >> > | 'connector.properties.group.id' = 'user_flink', >> > | 'format.type' = 'json', >> > | 'format.derive-schema' = 'true' >> > |) >> > |""".stripMargin) >> > >> > >> > >> > >> > >> > >> > streamTableEnv.executeSql( >> > """ >> > | >> > |CREATE TABLE user_hbase3( >> > | rowkey BIGINT, >> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) >> > |) WITH ( >> > | 'connector.type' = 'hbase', >> > | 'connector.version' = '2.1.0', >> > | 'connector.table-name' = 'user_hbase2', >> > | 'connector.zookeeper.znode.parent' = '/hbase', >> > | 'connector.write.buffer-flush.max-size' = '10mb', >> > | 'connector.write.buffer-flush.max-rows' = '1000', >> > | 'connector.write.buffer-flush.interval' = '2s' >> > |) >> > |""".stripMargin) >> > >> > >> > streamTableEnv.executeSql( >> > """ >> > | >> > |insert into user_hbase3 >> > |SELECT uid, >> > | >> > | ROW(sex, age, created_time ) as cf >> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as >> created_time from `user`) >> > | >> > |""".stripMargin) >> > >> > >> > >> > >> > >> > >> > >> > >> > > >-- >Best, Jingsong Lee