错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)




代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";

tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年08月7日 13:49,阿华田<a15733178...@163.com> 写道:
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制

回复