看起来是你依赖了一个老版本的 EnvironmentSettings,可能是本地 mvn cache 导致的。

可以尝试清空下 “~/.m2/repository/org/apache/flink/flink-table-api-java” 目录。 


Best,
Jark

> 在 2019年8月26日,17:56,ddwcg <3149768...@qq.com> 写道:
> 
> 都加了,还是不行,下面是我的pom文件和 libraires的截图
> 
> <repositories>
>    <repository>
>       <id>apache.snapshots</id>
>       <name>Apache Development Snapshot Repository</name>
>       <url>https://repository.apache.org/content/repositories/snapshots/ 
> <https://repository.apache.org/content/repositories/snapshots/></url>
>       <releases>
>          <enabled>false</enabled>
>       </releases>
>       <snapshots>
>          <enabled>true</enabled>
>       </snapshots>
>    </repository>
> </repositories>
> 
> <properties>
>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>    <flink.version>1.9.0</flink.version>
>    <scala.binary.version>2.11</scala.binary.version>
>    <scala.version>2.11.8</scala.version>
> </properties>
> 
> <dependencies>
>    <!-- Apache Flink dependencies -->
>    <!-- These dependencies are provided, because they should not be packaged 
> into the JAR file. -->
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-scala_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
> 
>    <!-- Scala Library, provided by Flink as well. -->
>    <dependency>
>       <groupId>org.scala-lang</groupId>
>       <artifactId>scala-library</artifactId>
>       <version>${scala.version}</version>
>    </dependency>
> 
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>       <scope>compile</scope>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       
> <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
> 
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-table-common</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
> 
> 
> 
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       
> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       
> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>       <version>${flink.version}</version>
>    </dependency>
> 
>    <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-log4j12</artifactId>
>       <version>1.7.7</version>
>       <scope>runtime</scope>
>    </dependency>
>    <dependency>
>       <groupId>log4j</groupId>
>       <artifactId>log4j</artifactId>
>       <version>1.2.17</version>
>       <scope>runtime</scope>
>    </dependency>
> </dependencies>
> <屏幕快照 2019-08-26 17.54.22.png>
> 
> 
>> 在 2019年8月26日,17:39,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 写道:
>> 
>> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再 IDE 中运行的话,确保没有加 
>> <scope>provided</scope>. 
>> 
>> 
>>> 在 2019年8月26日,17:25,ddwcg <3149768...@qq.com <mailto:3149768...@qq.com>> 写道:
>>> 
>>> 
>>> hi,我指定了使用blinkplanner,还是报一样的错
>>> object StreamingJob {
>>> def main(args: Array[String]) {
>>> 
>>>   val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>   val bsSettings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>   val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>>>   //val bsTableEnv2 = TableEnvironment.create(bsSettings)
>>>   bsEnv.execute("jobname")
>>> }
>>> 
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could 
>>> not instantiate the executor. Make sure a planner module is on the classpath
>>>     at 
>>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>>     at 
>>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>>     at 
>>> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>>     at com.test.StreamingJob$.main(StreamingJob.scala:13)
>>>     at com.test.StreamingJob.main(StreamingJob.scala)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
>>> Could not find a suitable table factory for 
>>> 'org.apache.flink.table.delegation.ExecutorFactory' in
>>> the classpath.
>>> 
>>> Reason: No factory implements 
>>> 'org.apache.flink.table.delegation.ExecutorFactory'.
>>> 
>>> The following properties are requested:
>>> batch-mode=false
>>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory
>>> 
>>> The following factories have been considered:
>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>> 
>>> 
>>> 
>>>> 在 2019年8月26日,15:07,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 写道:
>>>> 
>>>> Hi,
>>>> 
>>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用 blink planner,需要用 
>>>> EnvironmentSetting 声明 blink planner。
>>>> 详细请见: 
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>>
>>>> 
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>>> 在 2019年8月26日,14:56,ddwcg <3149768...@qq.com <mailto:3149768...@qq.com>> 
>>>>> 写道:
>>>>> 
>>>>> 大家好,
>>>>> 升级到1.9后有几个问题:
>>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>>>> 
>>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new 
>>>>> SimpleStringSchema, properties)
>>>>> 但是现在这个类已经找不到了
>>>>> 
>>>>> 2.所以我使用了 FlinkKafkaConsumer
>>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new 
>>>>> SimpleStringSchema(), properties)
>>>>> 不知道这个consumer背后对应的kafka版本是多少
>>>>> 
>>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>>>>> 
>>>>>  不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: 
>>>>> org.apache.flink.table.factories.StreamTableSourceFactory
>>>>> 
>>>>> 
>>>>> 
>>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: 
>>>>> Could not instantiate the executor. Make sure a planner module is on the 
>>>>> classpath
>>>>>   at 
>>>>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>>>>   at 
>>>>> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>>>>   at 
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>>>>   at 
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
>>>>>   at com.test.StreamingJob$.main(StreamingJob.scala:52)
>>>>>   at com.test.StreamingJob.main(StreamingJob.scala)
>>>>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>   at 
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>   at 
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>   at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
>>>>> Could not find a suitable table factory for 
>>>>> 'org.apache.flink.table.delegation.ExecutorFactory' in
>>>>> the classpath.
>>>>> 
>>>>> Reason: No factory implements 
>>>>> 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>>> 
>>>>> The following properties are requested:
>>>>> batch-mode=false
>>>>> 
>>>>> The following factories have been considered:
>>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>>> 
>>>>> 
>>>>> 我的pom文件如下:
>>>>> 
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> 
>>>>> <!-- Scala Library, provided by Flink as well. -->
>>>>> <dependency>
>>>>> <groupId>org.scala-lang</groupId>
>>>>> <artifactId>scala-library</artifactId>
>>>>> <version>${scala.version}</version>
>>>>> <scope>provided</scope>
>>>>> </dependency>
>>>>> 
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka_2.11</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> <scope>compile</scope>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-common</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> 
>>>>> 
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>> <version>${flink.version}</version>
>>>>> </dependency>
>>>>> 
>>>>> 
>>>>> 谢谢大家
>>>> 
>>> 
>> 
>> 
> 

回复