Hi, 删掉最后的“env.execute("Flink Table to External System")”试一下




--

    Best!
    Xuyang





在 2024-12-20 15:49:11,"杨承波" <ycbch...@gmail.com> 写道:
>请教一下各位大佬:
>flink1.15.4
>resultTable.executeInsert("active_users_bm")
>在执行计划里显示的都是Sink类型操作,为什么会报错,报错如下:
>No operators defined in streaming topology. Cannot execute.
>java.lang.IllegalStateException: No operators defined in streaming
>topology. Cannot execute.
>at
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2018)
>at
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2009)
>at
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1994)
>at
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833)
>at
>org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
>
>
>本地执行代码如下:
>val settings = EnvironmentSettings.newInstance.inStreamingMode.build
>val tableEnv = StreamTableEnvironment.create(env, settings)
>val inputTable = tableEnv.fromDataStream(source)
>val ddl = """
>            |CREATE TABLE active_users_bm (
>            |                                `dt` INT,
>            |                                `uid` BIGINT
>            |    ) WITH (
>            |       'load-url' = 'xxxx:8030',
>            |       'jdbc-url' = 'jdbc:mysql://xxxx:9030',
>            |       'connector' = 'starrocks',
>            |       'sink.properties.columns' = 'dt, uid,
>ubm=to_bitmap(uid)',
>            |       'database-name' = 'xx',
>            |       'table-name' = 'active_users_bm',
>            |       'username' = 'user',
>            |       'password' = 'xxx'
>            |    )
>            |""".stripMargin
>
>// 创建流执行环境和 Table 环境
>tableEnv.executeSql(ddl)
>// 执行查询
>val resultTable = tableEnv.sqlQuery(s"SELECT 1 dt, 1 uid FROM $inputTable")
>// 将结果表写入外部系统
>resultTable.executeInsert("active_users_bm")
>// resultTable.execute().print()
>// 执行程序
>env.execute("Flink Table to External System")
>
>
>
>执行计划日志:
>[Source: xxx-source ->
>DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1,
>type=*xxx* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[1 AS
>dt, 1:BIGINT AS uid]) -> Sink:
>Sink(table=[default_catalog.default_database.active_users_bm], fields=[dt,
>uid]) (1/1)#0] WARN org.apache.flink.metrics.MetricGroup - The operator
>name Sink: Sink(table=[default_catalog.default_database.active_users_bm],
>fields=[dt, uid]) exceeded the 80 characters length limit and was
>truncated.

回复