看起来是你依赖了一个老版本的 EnvironmentSettings,可能是本地 mvn cache 导致的。
可以尝试清空下 “~/.m2/repository/org/apache/flink/flink-table-api-java” 目录。 Best, Jark > 在 2019年8月26日,17:56,ddwcg <3149768...@qq.com> 写道: > > 都加了,还是不行,下面是我的pom文件和 libraires的截图 > > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > <url>https://repository.apache.org/content/repositories/snapshots/ > <https://repository.apache.org/content/repositories/snapshots/></url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > </repositories> > > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>1.9.0</flink.version> > <scala.binary.version>2.11</scala.binary.version> > <scala.version>2.11.8</scala.version> > </properties> > > <dependencies> > <!-- Apache Flink dependencies --> > <!-- These dependencies are provided, because they should not be packaged > into the JAR file. --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <!-- Scala Library, provided by Flink as well. --> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > <scope>compile</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > </dependency> > > > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > <scope>runtime</scope> > </dependency> > </dependencies> > <屏幕快照 2019-08-26 17.54.22.png> > > >> 在 2019年8月26日,17:39,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 写道: >> >> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 >> <scope>provided</scope>. >> >> >>> 在 2019年8月26日,17:25,ddwcg <3149768...@qq.com <mailto:3149768...@qq.com>> 写道: >>> >>> >>> hi,我指定了使用blinkplanner,还是报一样的错 >>> object StreamingJob { >>> def main(args: Array[String]) { >>> >>> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >>> val bsSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >>> //val bsTableEnv2 = TableEnvironment.create(bsSettings) >>> bsEnv.execute("jobname") >>> } >>> >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: Could >>> not instantiate the executor. Make sure a planner module is on the classpath >>> at >>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>> at >>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>> at >>> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>> at com.test.StreamingJob$.main(StreamingJob.scala:13) >>> at com.test.StreamingJob.main(StreamingJob.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>> Could not find a suitable table factory for >>> 'org.apache.flink.table.delegation.ExecutorFactory' in >>> the classpath. >>> >>> Reason: No factory implements >>> 'org.apache.flink.table.delegation.ExecutorFactory'. >>> >>> The following properties are requested: >>> batch-mode=false >>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory >>> >>> The following factories have been considered: >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>> >>> >>> >>>> 在 2019年8月26日,15:07,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 写道: >>>> >>>> Hi, >>>> >>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 >>>> EnvironmentSetting 声明 blink planner。 >>>> 详细请见: >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment> >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>> >>>> >>>> >>>> Best, >>>> Jark >>>> >>>>> 在 2019年8月26日,14:56,ddwcg <3149768...@qq.com <mailto:3149768...@qq.com>> >>>>> 写道: >>>>> >>>>> 大家好, >>>>> 升级到1.9后有几个问题: >>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011 >>>>> >>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new >>>>> SimpleStringSchema, properties) >>>>> 但是现在这个类已经找不到了 >>>>> >>>>> 2.所以我使用了 FlinkKafkaConsumer >>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new >>>>> SimpleStringSchema(), properties) >>>>> 不知道这个consumer背后对应的kafka版本是多少 >>>>> >>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version} >>>>> >>>>> 不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.flink.table.factories.StreamTableSourceFactory >>>>> >>>>> >>>>> >>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: >>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>> Could not instantiate the executor. Make sure a planner module is on the >>>>> classpath >>>>> at >>>>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) >>>>> at >>>>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) >>>>> at >>>>> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) >>>>> at >>>>> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) >>>>> at com.test.StreamingJob$.main(StreamingJob.scala:52) >>>>> at com.test.StreamingJob.main(StreamingJob.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>> Could not find a suitable table factory for >>>>> 'org.apache.flink.table.delegation.ExecutorFactory' in >>>>> the classpath. >>>>> >>>>> Reason: No factory implements >>>>> 'org.apache.flink.table.delegation.ExecutorFactory'. >>>>> >>>>> The following properties are requested: >>>>> batch-mode=false >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >>>>> >>>>> >>>>> 我的pom文件如下: >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <!-- Scala Library, provided by Flink as well. --> >>>>> <dependency> >>>>> <groupId>org.scala-lang</groupId> >>>>> <artifactId>scala-library</artifactId> >>>>> <version>${scala.version}</version> >>>>> <scope>provided</scope> >>>>> </dependency> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-connector-kafka_2.11</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>compile</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-common</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> >>>>> >>>>> 谢谢大家 >>>> >>> >> >> >