Re: 回复:flink1.11 DDL定义kafka source报错

2020-08-07 文章 chengyanan1...@foxmail.com
你好:
你使用的是Flink 1.11版本,但是你的建表语句还是用的老版本,建议更换新版本的建表语句后再试一下
参考如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html






chengyanan1...@foxmail.com
 
发件人: 阿华田
发送时间: 2020-08-07 14:03
收件人: user-zh@flink.apache.org
主题: 回复:flink1.11 DDL定义kafka source报错
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)
 
 
 
 
代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";
 
tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}
 
 
 
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
 
 
在2020年08月7日 13:49,阿华田 写道:
代码如下
 
 
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
 


Re: flink1.11 DDL定义kafka source报错

2020-08-07 文章 chengyanan1...@foxmail.com
你好 :
图片是看不到的,建议直接粘贴文本再发送一次





chengyanan1...@foxmail.com
 
发件人: 阿华田
发送时间: 2020-08-07 13:49
收件人: user-zh
主题: flink1.11 DDL定义kafka source报错
代码如下

阿华田
a15733178...@163.com
签名由 网易邮箱大师 定制 



回复:flink1.11 DDL定义kafka source报错

2020-08-07 文章 阿华田
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)




代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";

tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年08月7日 13:49,阿华田 写道:
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink1.11 DDL定义kafka source报错

2020-08-06 文章 阿华田
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制