Hi,
我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
请问有什么解决的方法吗?

















在 2020-07-08 16:07:17,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
>并没有真正的物理节点。你不用再调用了。
>
>Best,
>Jingsong
>
>On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wander...@163.com> wrote:
>
>>
>>
>>
>> 代码结构改成这样的了:
>>
>>
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>>
>>
>>
>>
>>
>> streamExecutionEnv.execute("from kafka sink hbase")
>>
>>
>>
>>
>> 还是报一样的错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-08 15:40:41,"夏帅" <jkill...@dingtalk.com.INVALID> 写道:
>> >你好,
>> >可以看看你的代码结构是不是以下这种
>> >    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> >    val bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>> >    val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> >  ......
>> >    tableEnv.execute("")
>> >如果是的话,可以尝试使用bsEnv.execute("")
>> >1.11对于两者的execute代码实现有改动
>> >
>> >
>> >------------------------------------------------------------------
>> >发件人:Zhou Zach <wander...@163.com>
>> >发送时间:2020年7月8日(星期三) 15:30
>> >收件人:Flink user-zh mailing list <user-zh@flink.apache.org>
>> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>> >
>> >代码在flink
>> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>> >Exception in thread "main" java.lang.IllegalStateException: No operators
>> defined in streaming topology. Cannot generate StreamGraph.
>> >at
>> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>> >at
>> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>> >at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>> >
>> >
>> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>> >
>> >
>> >
>> >
>> >query:
>> >streamTableEnv.executeSql(
>> >      """
>> >        |
>> >        |CREATE TABLE `user` (
>> >        |    uid BIGINT,
>> >        |    sex VARCHAR,
>> >        |    age INT,
>> >        |    created_time TIMESTAMP(3),
>> >        |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >        |) WITH (
>> >        |    'connector.type' = 'kafka',
>> >        |    'connector.version' = 'universal',
>> >        |    -- 'connector.topic' = 'user',
>> >        |    'connector.topic' = 'user_long',
>> >        |    'connector.startup-mode' = 'latest-offset',
>> >        |    'connector.properties.group.id' = 'user_flink',
>> >        |    'format.type' = 'json',
>> >        |    'format.derive-schema' = 'true'
>> >        |)
>> >        |""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >    streamTableEnv.executeSql(
>> >      """
>> >        |
>> >        |CREATE TABLE user_hbase3(
>> >        |    rowkey BIGINT,
>> >        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>> >        |) WITH (
>> >        |    'connector.type' = 'hbase',
>> >        |    'connector.version' = '2.1.0',
>> >        |    'connector.table-name' = 'user_hbase2',
>> >        |    'connector.zookeeper.znode.parent' = '/hbase',
>> >        |    'connector.write.buffer-flush.max-size' = '10mb',
>> >        |    'connector.write.buffer-flush.max-rows' = '1000',
>> >        |    'connector.write.buffer-flush.interval' = '2s'
>> >        |)
>> >        |""".stripMargin)
>> >
>> >
>> >    streamTableEnv.executeSql(
>> >      """
>> >        |
>> >        |insert into user_hbase3
>> >        |SELECT uid,
>> >        |
>> >        |  ROW(sex, age, created_time ) as cf
>> >        |  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as
>> created_time from `user`)
>> >        |
>> >        |""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
>-- 
>Best, Jingsong Lee

回复