Re: Sql Client读取Kafka报错:Could not find any factory for identifier 'kafka'

2021-01-11 文章 zhisheng
hello

你放 flink-sql-connector-kafka_2.11-1.11.3.jar  后有重启 sql client 和 集群吗?

Best
zhisheng

air23  于2021年1月11日周一 下午1:32写道:

> 下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-09 02:08:12,"inza9hi"  写道:
> >搜了下之前的邮件,貌似没有发现和我同样的问题。
> >
> >lib 下的Jar
> >flink-csv-1.11.3.jar
> >flink-table-blink_2.11-1.11.3.jar
> >flink-dist_2.11-1.11.3.jar
> >flink-table_2.11-1.11.3.jar
> >flink-jdbc_2.11-1.11.3.jar
>  log4j-1.2-api-2.12.1.jar
> >flink-json-1.11.3.jar  log4j-api-2.12.1.jar
> >flink-shaded-zookeeper-3.4.14.jar  log4j-core-2.12.1.jar
> >flink-sql-connector-elasticsearch6_2.11-1.11.3.jar
> >log4j-slf4j-impl-2.12.1.jar
> >flink-sql-connector-kafka_2.11-1.11.3.jar
> >mysql-connector-java-5.1.48.jar
> >
> >flink bin/sql-client.sh embedded
> >
> >CREATE TABLE user_behavior (
> >user_id BIGINT,
> >item_id BIGINT,
> >category_id BIGINT,
> >behavior STRING,
> >ts TIMESTAMP(3),
> >proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
> >WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> >在ts上定义watermark,ts成为事件时间列
> >
> >) WITH (
> >'connector' = 'kafka',  -- 使用 kafka connector
> >'topic' = 'data_test',  -- kafka topic
> >'startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> >'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
> >'format' = 'json'  -- 数据源格式为 json
> >);
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:Sql Client读取Kafka报错:Could not find any factory for identifier 'kafka'

2021-01-10 文章 air23
下载个 flink-sql-connector-kafka 这个jar 放在lib下试下

















在 2021-01-09 02:08:12,"inza9hi"  写道:
>搜了下之前的邮件,貌似没有发现和我同样的问题。
>
>lib 下的Jar   
>flink-csv-1.11.3.jar  
>flink-table-blink_2.11-1.11.3.jar
>flink-dist_2.11-1.11.3.jar
>flink-table_2.11-1.11.3.jar
>flink-jdbc_2.11-1.11.3.jar log4j-1.2-api-2.12.1.jar
>flink-json-1.11.3.jar  log4j-api-2.12.1.jar
>flink-shaded-zookeeper-3.4.14.jar  log4j-core-2.12.1.jar
>flink-sql-connector-elasticsearch6_2.11-1.11.3.jar
>log4j-slf4j-impl-2.12.1.jar
>flink-sql-connector-kafka_2.11-1.11.3.jar
>mysql-connector-java-5.1.48.jar
>
>flink bin/sql-client.sh embedded
>
>CREATE TABLE user_behavior (
>user_id BIGINT,
>item_id BIGINT,
>category_id BIGINT,
>behavior STRING,
>ts TIMESTAMP(3),
>proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
>WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
>在ts上定义watermark,ts成为事件时间列
>
>) WITH (
>'connector' = 'kafka',  -- 使用 kafka connector
>'topic' = 'data_test',  -- kafka topic
>'startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
>'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
>'format' = 'json'  -- 数据源格式为 json
>);
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Sql Client读取Kafka报错:Could not find any factory for identifier 'kafka'

2021-01-09 文章 inza9hi
搜了下之前的邮件,貌似没有发现和我同样的问题。

lib 下的Jar   
flink-csv-1.11.3.jar  
flink-table-blink_2.11-1.11.3.jar
flink-dist_2.11-1.11.3.jar
flink-table_2.11-1.11.3.jar
flink-jdbc_2.11-1.11.3.jar log4j-1.2-api-2.12.1.jar
flink-json-1.11.3.jar  log4j-api-2.12.1.jar
flink-shaded-zookeeper-3.4.14.jar  log4j-core-2.12.1.jar
flink-sql-connector-elasticsearch6_2.11-1.11.3.jar
log4j-slf4j-impl-2.12.1.jar
flink-sql-connector-kafka_2.11-1.11.3.jar
mysql-connector-java-5.1.48.jar

flink bin/sql-client.sh embedded

CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
在ts上定义watermark,ts成为事件时间列

) WITH (
'connector' = 'kafka',  -- 使用 kafka connector
'topic' = 'data_test',  -- kafka topic
'startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format' = 'json'  -- 数据源格式为 json
);




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: Could not find any factory for identifier 'kafka'

2020-07-27 文章 RS
Hi,
1. 好的,学习了
2. 
确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了




在 2020-07-27 11:42:50,"Caizhi Weng"  写道:
>Hi,
>
>Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
>是否能把这些资源文件打进去。
>
>另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
>的用户 jar 的话,并不需要把 Flink 的依赖也放进去。
>
>RS  于2020年7月24日周五 下午8:30写道:
>
>> hi,
>> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>>
>>
>> 我项目中新增目录:resources/META-INF/services
>> 然后从Flink源码中复制了2个文件
>> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
>> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>>
>>
>> 在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>> >hi
>> >只需要-sql和-json两个包就可以了
>> >
>> >
>> >| |
>> >JasonLee
>> >|
>> >|
>> >邮箱:17610775...@163.com
>> >|
>> >
>> >Signature is customized by Netease Mail Master
>> >
>> >On 07/24/2020 17:02, RS wrote:
>> >hi,
>> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> >编译的jar包是jar-with-dependencies的
>> >
>> >
>> >代码片段:
>> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> >   "  number BIGINT,\n" +
>> >   "  msg STRING,\n" +
>> >   "  username STRING,\n" +
>> >   "  update_time TIMESTAMP(3)\n" +
>> >   ") WITH (\n" +
>> >   " 'connector' = 'kafka',\n" +
>> >   " 'topic' = '%s',\n" +
>> >   " 'properties.bootstrap.servers' = '%s',\n" +
>> >   " 'properties.group.id' = '%s',\n" +
>> >   " 'format' = 'json',\n" +
>> >   " 'json.fail-on-missing-field' = 'false',\n" +
>> >   " 'json.ignore-parse-errors' = 'true'\n" +
>> >   ")\n", tableName, topic, servers, group);
>> >
>> >
>> >   StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >   StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> >   tableEnv.executeSql(ddlSql);
>> >
>> >
>> >报错信息:
>> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> >Available factory identifiers are:
>> >datagen
>> >at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> >at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> >... 33 more
>> >
>> >
>> >参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>> >
>> >
>> >附上pom依赖:
>> >
>> >   
>> >   org.apache.flink
>> >   flink-java
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-table-api-java-bridge_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-table-api-java
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-connector-kafka_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-sql-connector-kafka_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-json
>> >   ${flink.version}
>> >   
>> >   
>> >
>> >
>> >感谢各位~
>>


Re: Re: Could not find any factory for identifier 'kafka'

2020-07-26 文章 Caizhi Weng
Hi,

Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
是否能把这些资源文件打进去。

另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
的用户 jar 的话,并不需要把 Flink 的依赖也放进去。

RS  于2020年7月24日周五 下午8:30写道:

> hi,
> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>
>
> 我项目中新增目录:resources/META-INF/services
> 然后从Flink源码中复制了2个文件
> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>
>
> 在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
> >hi
> >只需要-sql和-json两个包就可以了
> >
> >
> >| |
> >JasonLee
> >|
> >|
> >邮箱:17610775...@163.com
> >|
> >
> >Signature is customized by Netease Mail Master
> >
> >On 07/24/2020 17:02, RS wrote:
> >hi,
> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> >编译的jar包是jar-with-dependencies的
> >
> >
> >代码片段:
> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
> >   "  number BIGINT,\n" +
> >   "  msg STRING,\n" +
> >   "  username STRING,\n" +
> >   "  update_time TIMESTAMP(3)\n" +
> >   ") WITH (\n" +
> >   " 'connector' = 'kafka',\n" +
> >   " 'topic' = '%s',\n" +
> >   " 'properties.bootstrap.servers' = '%s',\n" +
> >   " 'properties.group.id' = '%s',\n" +
> >   " 'format' = 'json',\n" +
> >   " 'json.fail-on-missing-field' = 'false',\n" +
> >   " 'json.ignore-parse-errors' = 'true'\n" +
> >   ")\n", tableName, topic, servers, group);
> >
> >
> >   StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >   StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> >   tableEnv.executeSql(ddlSql);
> >
> >
> >报错信息:
> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> >Available factory identifiers are:
> >datagen
> >at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> >at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> >... 33 more
> >
> >
> >参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
> >
> >
> >附上pom依赖:
> >
> >   
> >   org.apache.flink
> >   flink-java
> >   ${flink.version}
> >   
> >   
> >   org.apache.flink
> >   flink-table-api-java-bridge_2.12
> >   ${flink.version}
> >   
> >   
> >   org.apache.flink
> >   flink-table-api-java
> >   ${flink.version}
> >   
> >   
> >   org.apache.flink
> >   flink-connector-kafka_2.12
> >   ${flink.version}
> >   
> >   
> >   org.apache.flink
> >   flink-sql-connector-kafka_2.12
> >   ${flink.version}
> >   
> >   
> >   org.apache.flink
> >   flink-json
> >   ${flink.version}
> >   
> >   
> >
> >
> >感谢各位~
>


Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
hi,
感谢回复,尝试了多次之后,发现应该不是依赖包的问题


我项目中新增目录:resources/META-INF/services
然后从Flink源码中复制了2个文件 
org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。


在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>hi
>只需要-sql和-json两个包就可以了
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>On 07/24/2020 17:02, RS wrote:
>hi,
>Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>编译的jar包是jar-with-dependencies的
>
>
>代码片段:
>   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>   "  number BIGINT,\n" +
>   "  msg STRING,\n" +
>   "  username STRING,\n" +
>   "  update_time TIMESTAMP(3)\n" +
>   ") WITH (\n" +
>   " 'connector' = 'kafka',\n" +
>   " 'topic' = '%s',\n" +
>   " 'properties.bootstrap.servers' = '%s',\n" +
>   " 'properties.group.id' = '%s',\n" +
>   " 'format' = 'json',\n" +
>   " 'json.fail-on-missing-field' = 'false',\n" +
>   " 'json.ignore-parse-errors' = 'true'\n" +
>   ")\n", tableName, topic, servers, group);
>
>
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>   tableEnv.executeSql(ddlSql);
>
>
>报错信息:
>Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
>factory for identifier 'kafka' that implements 
>'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
>Available factory identifiers are:
>datagen
>at 
>org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>at 
>org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>... 33 more
>
>
>参考了这个 
>http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
>附上pom依赖:
>
>   
>   org.apache.flink
>   flink-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java-bridge_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-sql-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-json
>   ${flink.version}
>   
>   
>
>
>感谢各位~


Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 JasonLee
hi
只需要-sql和-json两个包就可以了


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 07/24/2020 17:02, RS wrote:
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
   public String ddlSql = String.format("CREATE TABLE %s (\n" +
   "  number BIGINT,\n" +
   "  msg STRING,\n" +
   "  username STRING,\n" +
   "  update_time TIMESTAMP(3)\n" +
   ") WITH (\n" +
   " 'connector' = 'kafka',\n" +
   " 'topic' = '%s',\n" +
   " 'properties.bootstrap.servers' = '%s',\n" +
   " 'properties.group.id' = '%s',\n" +
   " 'format' = 'json',\n" +
   " 'json.fail-on-missing-field' = 'false',\n" +
   " 'json.ignore-parse-errors' = 'true'\n" +
   ")\n", tableName, topic, servers, group);


   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
   tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:

   
   org.apache.flink
   flink-java
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java-bridge_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java
   ${flink.version}
   
   
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-json
   ${flink.version}
   
   


感谢各位~

Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 admin
  
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   

这两个会有冲突,去掉上面那个

> 2020年7月24日 下午5:02,RS  写道:
> 
>   
>org.apache.flink
>flink-connector-kafka_2.12
>${flink.version}
>
>
>org.apache.flink
>flink-sql-connector-kafka_2.12
>${flink.version}
>



Re:Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
邮件格式不对,我重新回复下


我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。

> flink run xxx

没有使用shade-plugin

maven build参数:

1.8
1.11.1










maven-compiler-plugin



${jdk.version}

${jdk.version}







org.apache.maven.plugins

maven-assembly-plugin





package



single











jar-with-dependencies














Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
我这边是直接打成jar包扔到服务器上运行的(bin/flink run 
xxx),没有在IDEA运行过。maven编译没配置shade-plugin,maven build参数如下:
propertiesjdk.version1.8/jdk.version  
  flink.version1.11.1/flink.version
/propertiesbuildplugins  
  plugin
artifactIdmaven-compiler-plugin/artifactId
configuration
source${jdk.version}/source
target${jdk.version}/target
/configuration/plugin
plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-assembly-plugin/artifactId
executionsexecution   
 phasepackage/phase
goals
goalsingle/goal/goals 
   /execution
/executionsconfiguration  
  descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef   
 /descriptorRefs
/configuration/plugin
/plugins/buildthx
在 2020-07-24 17:36:46,"Benchao Li"  写道:
>可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
>
>如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
>如果你用的是shade plugin,需要看下这个transformer[1]
>
>[1]
>https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer
>
>RS  于2020年7月24日周五 下午5:02写道:
>
>> hi,
>> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> 编译的jar包是jar-with-dependencies的
>>
>>
>> 代码片段:
>> public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> "  number BIGINT,\n" +
>> "  msg STRING,\n" +
>> "  username STRING,\n" +
>> "  update_time TIMESTAMP(3)\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = '%s',\n" +
>> " 'properties.bootstrap.servers' = '%s',\n" +
>> " 'properties.group.id' = '%s',\n" +
>> " 'format' = 'json',\n" +
>> " 'json.fail-on-missing-field' = 'false',\n" +
>> " 'json.ignore-parse-errors' = 'true'\n" +
>> ")\n", tableName, topic, servers, group);
>>
>>
>>     StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> tableEnv.executeSql(ddlSql);
>>
>>
>> 报错信息:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> Available factory identifiers are:
>> datagen
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 33 more
>>
>>
>> 参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>>
>>
>> 附上pom依赖:
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-sql-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-json
>> ${flink.version}
>> 
>> 
>>
>>
>> 感谢各位~
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 Benchao Li
可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
如果你用的是shade plugin,需要看下这个transformer[1]

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer

RS  于2020年7月24日周五 下午5:02写道:

> hi,
> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> 编译的jar包是jar-with-dependencies的
>
>
> 代码片段:
> public String ddlSql = String.format("CREATE TABLE %s (\n" +
> "  number BIGINT,\n" +
> "  msg STRING,\n" +
> "  username STRING,\n" +
> "  update_time TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = '%s',\n" +
> " 'properties.bootstrap.servers' = '%s',\n" +
> " 'properties.group.id' = '%s',\n" +
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")\n", tableName, topic, servers, group);
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> tableEnv.executeSql(ddlSql);
>
>
> 报错信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> Available factory identifiers are:
> datagen
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 33 more
>
>
> 参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
> 附上pom依赖:
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java-bridge_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-sql-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
>
>
> 感谢各位~



-- 

Best,
Benchao Li


Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
public String ddlSql = String.format("CREATE TABLE %s (\n" +
"  number BIGINT,\n" +
"  msg STRING,\n" +
"  username STRING,\n" +
"  update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = '%s',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")\n", tableName, topic, servers, group);


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}


org.apache.flink
flink-table-api-java
${flink.version}


org.apache.flink
flink-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-json
${flink.version}




感谢各位~

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成
flink-sql-connector-kafka
后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下:

org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}

添加provided后就没有问题了。

最后附上正确的pom文件 (如 Jingsong
所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中):




org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}



org.apache.flink
flink-json
${flink.version}


org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}




Jingsong Li  于2020年7月13日周一 下午4:35写道:

> Hi,
>
> 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
>
> 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
> spi)
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> Best,
> Jingsong
>
> On Mon, Jul 13, 2020 at 4:04 PM 王松  wrote:
>
> > 你好本超,
> > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
> >
> > Benchao Li  于2020年7月13日周一 下午3:42写道:
> >
> > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > > 当然,直接粗暴的放到lib下,也是可以的。
> > >
> > > Leonard Xu  于2020年7月13日周一 下午3:38写道:
> > >
> > > > Hi
> > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > > >
> > > > 祝好
> > > >
> > > > > 在 2020年7月13日,15:28,王松  写道:
> > > > >
> > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > > >
> > > > > 我机器上flink/lib下jar包如下:
> > > > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41
> > flink-avro-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09
> > flink-csv-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > > flink-dist_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09
> > flink-json-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > > flink-shaded-zookeeper-3.4.14.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > > flink-table_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > > flink-table-blink_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > > > log4j-1.2-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09
> > log4j-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> > log4j-core-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > > > log4j-slf4j-impl-2.12.1.jar
> > > > >
> > > > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > > > >
> > > > >> Hi,
> > > > >> flink-connector-kafka_${scala.binary.version 和
> > > > >> flink-sql-connector-kafka_${scala.binary.version
> > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > > >>
> > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > > >>
> > > > >> 祝好
> > > > >> Leonard Xu
> > > > >>
> > > > >>> 在 2020年7月13日,14:42,王松  写道:
> > > > >>>
> > > > >>> @Leonard Xu,
> > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > > >>> =
> > > > >>> 
> > > > >>>   org.apache.flink
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> flink-sql-connector-kafka_${scala.binary.version}
> > > > >>>   ${flink.version}
> > > > >>>   
> > > > >>>
> > > > >>>
> > > > >>>   
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> 
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>
> > > > >> 
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>
> > > > >>>   
> > > > >>>   
> > > > >>>
> >  
> > > > >>>   
> > > > >>>   
> > > > >>> =
> > > > >>>
> > > > >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> > > > >>>
> > > >  Hi, 王松
> > > > 
> > > >  这个报错是pom中缺少了 Kafka SQL 

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 Jingsong Li
Hi,

1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
spi)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Jingsong

On Mon, Jul 13, 2020 at 4:04 PM 王松  wrote:

> 你好本超,
> 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
>
> Benchao Li  于2020年7月13日周一 下午3:42写道:
>
> > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > 当然,直接粗暴的放到lib下,也是可以的。
> >
> > Leonard Xu  于2020年7月13日周一 下午3:38写道:
> >
> > > Hi
> > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > >
> > > 祝好
> > >
> > > > 在 2020年7月13日,15:28,王松  写道:
> > > >
> > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > >
> > > > 我机器上flink/lib下jar包如下:
> > > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41
> flink-avro-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09
> flink-csv-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > flink-dist_2.11-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09
> flink-json-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > flink-shaded-zookeeper-3.4.14.jar
> > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > flink-table_2.11-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > flink-table-blink_2.11-1.11.0.jar
> > > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > > log4j-1.2-api-2.12.1.jar
> > > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09
> log4j-api-2.12.1.jar
> > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> log4j-core-2.12.1.jar
> > > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > > log4j-slf4j-impl-2.12.1.jar
> > > >
> > > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > > >
> > > >> Hi,
> > > >> flink-connector-kafka_${scala.binary.version 和
> > > >> flink-sql-connector-kafka_${scala.binary.version
> > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > >>
> > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > >>
> > > >> 祝好
> > > >> Leonard Xu
> > > >>
> > > >>> 在 2020年7月13日,14:42,王松  写道:
> > > >>>
> > > >>> @Leonard Xu,
> > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > >>> =
> > > >>> 
> > > >>>   org.apache.flink
> > > >>>
> > > >>>
> > > >>
> > >
> >
> flink-sql-connector-kafka_${scala.binary.version}
> > > >>>   ${flink.version}
> > > >>>   
> > > >>>
> > > >>>
> > > >>>   
> > > >>>
> > > >>>
> > > >>
> > >
> >
> 
> > > >>>   
> > > >>>   
> > > >>>   
> > > >>>   
> > > >>>
> > > >> 
> > > >>>   
> > > >>>   
> > > >>>   
> > > >>>
> > > >>>   
> > > >>>   
> > > >>>
>  
> > > >>>   
> > > >>>   
> > > >>> =
> > > >>>
> > > >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> > > >>>
> > >  Hi, 王松
> > > 
> > >  这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> > > connector
> > > 
> > > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> > >  可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> > >  datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> > > 
> > > 
> > >  祝好,
> > >  Leonard Xu
> > >  [1]
> > > 
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >  <
> > > 
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > >
> > > >  
> > > >  org.apache.flink
> > > >
> > > >
> > > flink-connector-kafka_${scala.binary.version}
> > > >  ${flink.version}
> > > >  
> > > >  
> > > >  org.apache.flink
> > > >
> > > >
> > > 
> > > >>
> > >
> >
> flink-connector-kafka-0.11_${scala.binary.version}
> > > >  ${flink.version}
> > > >  
> > > >  
> > > >  org.apache.flink
> > > >
> > > >
> > > flink-connector-kafka_${scala.binary.version}
> > > >  ${flink.version}
> > > >  
> > > >  
> > > >  org.apache.flink
> > > >  flink-core
> > > >  ${flink.version}
> > > >  
> > > > =
> > > 
> > > 
> > > >>
> > > >>
> > >
> > >

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
你好本超,
是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的

Benchao Li  于2020年7月13日周一 下午3:42写道:

> 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> 当然,直接粗暴的放到lib下,也是可以的。
>
> Leonard Xu  于2020年7月13日周一 下午3:38写道:
>
> > Hi
> > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> >
> > 祝好
> >
> > > 在 2020年7月13日,15:28,王松  写道:
> > >
> > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > >
> > > 我机器上flink/lib下jar包如下:
> > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > flink-dist_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > flink-shaded-zookeeper-3.4.14.jar
> > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > flink-table_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > flink-table-blink_2.11-1.11.0.jar
> > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > log4j-1.2-api-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > log4j-slf4j-impl-2.12.1.jar
> > >
> > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > >
> > >> Hi,
> > >> flink-connector-kafka_${scala.binary.version 和
> > >> flink-sql-connector-kafka_${scala.binary.version
> > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > >>
> > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > >>
> > >> 祝好
> > >> Leonard Xu
> > >>
> > >>> 在 2020年7月13日,14:42,王松  写道:
> > >>>
> > >>> @Leonard Xu,
> > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >>> =
> > >>> 
> > >>>   org.apache.flink
> > >>>
> > >>>
> > >>
> >
> flink-sql-connector-kafka_${scala.binary.version}
> > >>>   ${flink.version}
> > >>>   
> > >>>
> > >>>
> > >>>   
> > >>>
> > >>>
> > >>
> >
> 
> > >>>   
> > >>>   
> > >>>   
> > >>>   
> > >>>
> > >> 
> > >>>   
> > >>>   
> > >>>   
> > >>>
> > >>>   
> > >>>   
> > >>>   
> > >>>   
> > >>>   
> > >>> =
> > >>>
> > >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> > >>>
> >  Hi, 王松
> > 
> >  这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> > connector
> > 
> > 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >  可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >  datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> > 
> > 
> >  祝好,
> >  Leonard Xu
> >  [1]
> > 
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >  <
> > 
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > >
> > >  
> > >  org.apache.flink
> > >
> > >
> > flink-connector-kafka_${scala.binary.version}
> > >  ${flink.version}
> > >  
> > >  
> > >  org.apache.flink
> > >
> > >
> > 
> > >>
> >
> flink-connector-kafka-0.11_${scala.binary.version}
> > >  ${flink.version}
> > >  
> > >  
> > >  org.apache.flink
> > >
> > >
> > flink-connector-kafka_${scala.binary.version}
> > >  ${flink.version}
> > >  
> > >  
> > >  org.apache.flink
> > >  flink-core
> > >  ${flink.version}
> > >  
> > > =
> > 
> > 
> > >>
> > >>
> >
> >
>
> --
>
> Best,
> Benchao Li
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
这样还是不行,我尝试flink-connector-kafka-0.11_2.11-1.11.0.jar放到lib下,报了另外一个问题:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase


另外,我是用 bin/flink run -yid xxx xxx.jar 的方式提交任务的,报错是直接在终端报错,没有提交到flink
jobmanager上。

Leonard Xu  于2020年7月13日周一 下午3:38写道:

> Hi
> 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
>
> 祝好
>
> > 在 2020年7月13日,15:28,王松  写道:
> >
> > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> >
> > 我机器上flink/lib下jar包如下:
> > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> flink-dist_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > flink-shaded-zookeeper-3.4.14.jar
> > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > flink-table_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > flink-table-blink_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> log4j-1.2-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > log4j-slf4j-impl-2.12.1.jar
> >
> > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> >
> >> Hi,
> >> flink-connector-kafka_${scala.binary.version 和
> >> flink-sql-connector-kafka_${scala.binary.version
> >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> >>
> >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> >>
> >> 祝好
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,14:42,王松  写道:
> >>>
> >>> @Leonard Xu,
> >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >>>
> >>> [1]
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>> =
> >>> 
> >>>   org.apache.flink
> >>>
> >>>
> >>
> flink-sql-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>
> >>>
> >>>   
> >>>
> >>>
> >>
> 
> >>>   
> >>>   
> >>>   
> >>>   
> >>>
> >> 
> >>>   
> >>>   
> >>>   
> >>>
> >>>   
> >>>   
> >>>   
> >>>   
> >>>   
> >>> =
> >>>
> >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> >>>
>  Hi, 王松
> 
>  这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> connector
> 
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
>  可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
>  datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> 
> 
>  祝好,
>  Leonard Xu
>  [1]
> 
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>  <
> 
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >
> >  
> >  org.apache.flink
> >
> >
> flink-connector-kafka_${scala.binary.version}
> >  ${flink.version}
> >  
> >  
> >  org.apache.flink
> >
> >
> 
> >>
> flink-connector-kafka-0.11_${scala.binary.version}
> >  ${flink.version}
> >  
> >  
> >  org.apache.flink
> >
> >
> flink-connector-kafka_${scala.binary.version}
> >  ${flink.version}
> >  
> >  
> >  org.apache.flink
> >  flink-core
> >  ${flink.version}
> >  
> > =
> 
> 
> >>
> >>
>
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 Benchao Li
你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
当然,直接粗暴的放到lib下,也是可以的。

Leonard Xu  于2020年7月13日周一 下午3:38写道:

> Hi
> 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
>
> 祝好
>
> > 在 2020年7月13日,15:28,王松  写道:
> >
> > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> >
> > 我机器上flink/lib下jar包如下:
> > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> flink-dist_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > flink-shaded-zookeeper-3.4.14.jar
> > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > flink-table_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > flink-table-blink_2.11-1.11.0.jar
> > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> log4j-1.2-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > log4j-slf4j-impl-2.12.1.jar
> >
> > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> >
> >> Hi,
> >> flink-connector-kafka_${scala.binary.version 和
> >> flink-sql-connector-kafka_${scala.binary.version
> >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> >>
> >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> >>
> >> 祝好
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,14:42,王松  写道:
> >>>
> >>> @Leonard Xu,
> >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >>>
> >>> [1]
> >>>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>> =
> >>> 
> >>>   org.apache.flink
> >>>
> >>>
> >>
> flink-sql-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>
> >>>
> >>>   
> >>>
> >>>
> >>
> 
> >>>   
> >>>   
> >>>   
> >>>   
> >>>
> >> 
> >>>   
> >>>   
> >>>   
> >>>
> >>>   
> >>>   
> >>>   
> >>>   
> >>>   
> >>> =
> >>>
> >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> >>>
>  Hi, 王松
> 
>  这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream
> connector
> 
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
>  可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
>  datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> 
> 
>  祝好,
>  Leonard Xu
>  [1]
> 
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>  <
> 
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >
> >  
> >  org.apache.flink
> >
> >
> flink-connector-kafka_${scala.binary.version}
> >  ${flink.version}
> >  
> >  
> >  org.apache.flink
> >
> >
> 
> >>
> flink-connector-kafka-0.11_${scala.binary.version}
> >  ${flink.version}
> >  
> >  
> >  org.apache.flink
> >
> >
> flink-connector-kafka_${scala.binary.version}
> >  ${flink.version}
> >  
> >  
> >  org.apache.flink
> >  flink-core
> >  ${flink.version}
> >  
> > =
> 
> 
> >>
> >>
>
>

-- 

Best,
Benchao Li


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 Leonard Xu
Hi
你可以试下把 flink-connector-kafka_2.11-1.11.0.jar 
的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。

祝好

> 在 2020年7月13日,15:28,王松  写道:
> 
> 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> 
> 我机器上flink/lib下jar包如下:
> -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09 flink-dist_2.11-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> flink-shaded-zookeeper-3.4.14.jar
> -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> flink-table_2.11-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> flink-table-blink_2.11-1.11.0.jar
> -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09 log4j-1.2-api-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
> -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> log4j-slf4j-impl-2.12.1.jar
> 
> Leonard Xu  于2020年7月13日周一 下午3:05写道:
> 
>> Hi,
>> flink-connector-kafka_${scala.binary.version 和
>> flink-sql-connector-kafka_${scala.binary.version
>> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
>> 后者的话主要是对前者做了shade处理,方便用户在 SQL
>> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
>> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
>> 
>> [1] 中的话是有SQL Client JAR 的下载链接,就是
>> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
>> 
>> 祝好
>> Leonard Xu
>> 
>>> 在 2020年7月13日,14:42,王松  写道:
>>> 
>>> @Leonard Xu,
>>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
>>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
>>> 
>>> [1]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>> =
>>> 
>>>   org.apache.flink
>>> 
>>> 
>> flink-sql-connector-kafka_${scala.binary.version}
>>>   ${flink.version}
>>>   
>>> 
>>>
>>>   
>>> 
>>> 
>> 
>>>   
>>>   
>>>   
>>>   
>>> 
>> 
>>>   
>>>   
>>>   
>>> 
>>>   
>>>   
>>>   
>>>   
>>>   
>>> =
>>> 
>>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
>>> 
 Hi, 王松
 
 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
 datastream  connector 同时引用是会冲突的,请根据你的需要使用。
 
 
 祝好,
 Leonard Xu
 [1]
 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
 <
 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> 
>  
>  org.apache.flink
> 
> flink-connector-kafka_${scala.binary.version}
>  ${flink.version}
>  
>  
>  org.apache.flink
> 
> 
 
>> flink-connector-kafka-0.11_${scala.binary.version}
>  ${flink.version}
>  
>  
>  org.apache.flink
> 
> flink-connector-kafka_${scala.binary.version}
>  ${flink.version}
>  
>  
>  org.apache.flink
>  flink-core
>  ${flink.version}
>  
> =
 
 
>> 
>> 



Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。

我机器上flink/lib下jar包如下:
-rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41 flink-avro-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09 flink-csv-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09 flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09 flink-json-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
flink-table-blink_2.11-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09 log4j-api-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09 log4j-core-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
log4j-slf4j-impl-2.12.1.jar

Leonard Xu  于2020年7月13日周一 下午3:05写道:

> Hi,
> flink-connector-kafka_${scala.binary.version 和
> flink-sql-connector-kafka_${scala.binary.version
> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
>
> [1] 中的话是有SQL Client JAR 的下载链接,就是
> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月13日,14:42,王松  写道:
> >
> > @Leonard Xu,
> > 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > =
> >  
> >org.apache.flink
> >
> >
> flink-sql-connector-kafka_${scala.binary.version}
> >${flink.version}
> >
> >
> > 
> >
> >
> >
> 
> >
> >
> >
> >
> >
> 
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > =
> >
> > Leonard Xu  于2020年7月13日周一 下午1:39写道:
> >
> >> Hi, 王松
> >>
> >> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
> >> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> >> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> >> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
> >>
> >>
> >> 祝好,
> >> Leonard Xu
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >> <
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >>>
> >>>   
> >>>   org.apache.flink
> >>>
> >>> flink-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>   
> >>>   org.apache.flink
> >>>
> >>>
> >>
> flink-connector-kafka-0.11_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>   
> >>>   org.apache.flink
> >>>
> >>> flink-connector-kafka_${scala.binary.version}
> >>>   ${flink.version}
> >>>   
> >>>   
> >>>   org.apache.flink
> >>>   flink-core
> >>>   ${flink.version}
> >>>   
> >>> =
> >>
> >>
>
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 Leonard Xu
Hi,
flink-connector-kafka_${scala.binary.version 和 
flink-sql-connector-kafka_${scala.binary.version 只用加载一个应该就好了,前者的话是dataStream 或者 
Table API 程序使用,
后者的话主要是对前者做了shade处理,方便用户在 SQL 
Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。

[1] 中的话是有SQL Client JAR 的下载链接,就是 
flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。 

祝好
Leonard Xu

> 在 2020年7月13日,14:42,王松  写道:
> 
> @Leonard Xu,
> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> =
>  
>org.apache.flink
> 
> flink-sql-connector-kafka_${scala.binary.version}
>${flink.version}
>
> 
> 
>
> 
> 
>
>
>
>
>
>
>
>
> 
>
>
>
>
>
> =
> 
> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> 
>> Hi, 王松
>> 
>> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
>> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
>> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
>> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
>> 
>> 
>> 祝好,
>> Leonard Xu
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>>> 
>>>   
>>>   org.apache.flink
>>> 
>>> flink-connector-kafka_${scala.binary.version}
>>>   ${flink.version}
>>>   
>>>   
>>>   org.apache.flink
>>> 
>>> 
>> flink-connector-kafka-0.11_${scala.binary.version}
>>>   ${flink.version}
>>>   
>>>   
>>>   org.apache.flink
>>> 
>>> flink-connector-kafka_${scala.binary.version}
>>>   ${flink.version}
>>>   
>>>   
>>>   org.apache.flink
>>>   flink-core
>>>   ${flink.version}
>>>   
>>> =
>> 
>> 



Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
@Leonard Xu,
非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
=
  
org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}


 

















=

Leonard Xu  于2020年7月13日周一 下午1:39写道:

> Hi, 王松
>
> 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector
> 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
> 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka
> datastream  connector 同时引用是会冲突的,请根据你的需要使用。
>
>
> 祝好,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> >
> >
> >org.apache.flink
> >
> > flink-connector-kafka_${scala.binary.version}
> >${flink.version}
> >
> >
> >org.apache.flink
> >
> >
> flink-connector-kafka-0.11_${scala.binary.version}
> >${flink.version}
> >
> >
> >org.apache.flink
> >
> > flink-connector-kafka_${scala.binary.version}
> >${flink.version}
> >
> >
> >org.apache.flink
> >flink-core
> >${flink.version}
> >
> > =
>
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 Leonard Xu
Hi, 王松

这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream  connector 
的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} 
可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream  
connector 同时引用是会冲突的,请根据你的需要使用。


祝好,
Leonard Xu
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
 

>
>org.apache.flink
> 
> flink-connector-kafka_${scala.binary.version}
>${flink.version}
>
>
>org.apache.flink
> 
> flink-connector-kafka-0.11_${scala.binary.version}
>${flink.version}
>
>
>org.apache.flink
> 
> flink-connector-kafka_${scala.binary.version}
>${flink.version}
>
>
>org.apache.flink
>flink-core
>${flink.version}
> 
> =



Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 tison
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了

Best,
tison.


王松  于2020年7月13日周一 下午12:54写道:

> 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> 请问是什么原因导致的呢?
>
> 代码如下:
>
>
> -
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
> settings);
>
> tenv.executeSql("CREATE TABLE test_table (\n" +
> " id INT,\n" +
> " name STRING,\n" +
> " age INT,\n" +
> " create_at TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'test_json',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'testGroup',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset'\n" +
> ")");
> Table table = tenv.sqlQuery("select * from test_table");
> tenv.toRetractStream(table, Row.class).print();
> env.execute("flink 1.11.0 demo");
>
> -
>
> pom 文件如下:
> =
> 
> 2.11
> 1.11.0
> 
> 
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-table-runtime-blink_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka-0.11_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-core
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> =
>


flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 王松
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.
请问是什么原因导致的呢?

代码如下:

-
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
settings);

tenv.executeSql("CREATE TABLE test_table (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT,\n" +
" create_at TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test_json',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")");
Table table = tenv.sqlQuery("select * from test_table");
tenv.toRetractStream(table, Row.class).print();
env.execute("flink 1.11.0 demo");
-

pom 文件如下:
=

2.11
1.11.0



org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}


org.apache.flink

flink-table-runtime-blink_${scala.binary.version}
${flink.version}


org.apache.flink
flink-json
${flink.version}


org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink

flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}


org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-table-common
${flink.version}


=