Hi, 删掉最后的“env.execute("Flink Table to External System")”试一下
-- Best! Xuyang 在 2024-12-20 15:49:11,"杨承波" <ycbch...@gmail.com> 写道: >请教一下各位大佬: >flink1.15.4 >resultTable.executeInsert("active_users_bm") >在执行计划里显示的都是Sink类型操作,为什么会报错,报错如下: >No operators defined in streaming topology. Cannot execute. >java.lang.IllegalStateException: No operators defined in streaming >topology. Cannot execute. >at >org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2018) >at >org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2009) >at >org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1994) >at >org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833) >at >org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) > > >本地执行代码如下: >val settings = EnvironmentSettings.newInstance.inStreamingMode.build >val tableEnv = StreamTableEnvironment.create(env, settings) >val inputTable = tableEnv.fromDataStream(source) >val ddl = """ > |CREATE TABLE active_users_bm ( > | `dt` INT, > | `uid` BIGINT > | ) WITH ( > | 'load-url' = 'xxxx:8030', > | 'jdbc-url' = 'jdbc:mysql://xxxx:9030', > | 'connector' = 'starrocks', > | 'sink.properties.columns' = 'dt, uid, >ubm=to_bitmap(uid)', > | 'database-name' = 'xx', > | 'table-name' = 'active_users_bm', > | 'username' = 'user', > | 'password' = 'xxx' > | ) > |""".stripMargin > >// 创建流执行环境和 Table 环境 >tableEnv.executeSql(ddl) >// 执行查询 >val resultTable = tableEnv.sqlQuery(s"SELECT 1 dt, 1 uid FROM $inputTable") >// 将结果表写入外部系统 >resultTable.executeInsert("active_users_bm") >// resultTable.execute().print() >// 执行程序 >env.execute("Flink Table to External System") > > > >执行计划日志: >[Source: xxx-source -> >DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, >type=*xxx* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[1 AS >dt, 1:BIGINT AS uid]) -> Sink: >Sink(table=[default_catalog.default_database.active_users_bm], fields=[dt, >uid]) (1/1)#0] WARN org.apache.flink.metrics.MetricGroup - The operator >name Sink: Sink(table=[default_catalog.default_database.active_users_bm], >fields=[dt, uid]) exceeded the 80 characters length limit and was >truncated.