Hi, 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting 声明 blink planner。 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
Best, Jark > 在 2019年8月26日,14:56,ddwcg <3149768...@qq.com> 写道: > > 大家好, > 升级到1.9后有几个问题: > 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 > > val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new > SimpleStringSchema, properties) > 但是现在这个类已经找不到了 > > 2.所以我使用了 FlinkKafkaConsumer > val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new > SimpleStringSchema(), properties) > 不知道这个consumer背后对应的kafka版本是多少 > > 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} > > 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: > org.apache.flink.table.factories.StreamTableSourceFactory > > > > 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: > Exception in thread "main" org.apache.flink.table.api.TableException: Could > not instantiate the executor. Make sure a planner module is on the classpath > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) > at > org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) > at > org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) > at com.test.StreamingJob$.main(StreamingJob.scala:52) > at com.test.StreamingJob.main(StreamingJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.delegation.ExecutorFactory' in > the classpath. > > Reason: No factory implements > 'org.apache.flink.table.delegation.ExecutorFactory'. > > The following properties are requested: > batch-mode=false > > The following factories have been considered: > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > > > 我的pom文件如下: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > <!-- Scala Library, provided by Flink as well. --> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.11</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > 谢谢大家