hi,我指定了使用blinkplanner,还是报一样的错 object StreamingJob { def main(args: Array[String]) {
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) //val bsTableEnv2 = TableEnvironment.create(bsSettings) bsEnv.execute("jobname") } Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) at com.test.StreamingJob$.main(StreamingJob.scala:13) at com.test.StreamingJob.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'. The following properties are requested: batch-mode=false class-name=org.apache.flink.table.executor.BlinkExecutorFactory The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > 在 2019年8月26日,15:07,Jark Wu <imj...@gmail.com> 写道: > > Hi, > > 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 EnvironmentSetting > 声明 blink planner。 > 详细请见: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> > > > Best, > Jark > >> 在 2019年8月26日,14:56,ddwcg <3149768...@qq.com> 写道: >> >> 大家好, >> 升级到1.9后有几个问题: >> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >> >> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new >> SimpleStringSchema, properties) >> 但是现在这个类已经找不到了 >> >> 2.所以我使用了 FlinkKafkaConsumer >> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new >> SimpleStringSchema(), properties) >> 不知道这个consumer背后对应的kafka版本是多少 >> >> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} >> >> 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.table.factories.StreamTableSourceFactory >> >> >> >> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: >> Exception in thread "main" org.apache.flink.table.api.TableException: Could >> not instantiate the executor. Make sure a planner module is on the classpath >> at >> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >> at >> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >> at >> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >> at >> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) >> at com.test.StreamingJob$.main(StreamingJob.scala:52) >> at com.test.StreamingJob.main(StreamingJob.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could >> not find a suitable table factory for >> 'org.apache.flink.table.delegation.ExecutorFactory' in >> the classpath. >> >> Reason: No factory implements >> 'org.apache.flink.table.delegation.ExecutorFactory'. >> >> The following properties are requested: >> batch-mode=false >> >> The following factories have been considered: >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >> >> >> 我的pom文件如下: >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> >> <!-- Scala Library, provided by Flink as well. --> >> <dependency> >> <groupId>org.scala-lang</groupId> >> <artifactId>scala-library</artifactId> >> <version>${scala.version}</version> >> <scope>provided</scope> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka_2.11</artifactId> >> <version>${flink.version}</version> >> <scope>compile</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-common</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> 谢谢大家 >