Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:03
To:user-zh <user-zh@flink.apache.org>
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
&nbsp;&nbsp;&nbsp;&nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


一下是pom内容


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
   <profile&gt;
      <id&gt;dev</id&gt;
      <activation&gt;
         <activeByDefault&gt;true</activeByDefault&gt;
      </activation&gt;
      <properties&gt;
         <project.scope&gt;compile</project.scope&gt;
      </properties&gt;
   </profile&gt;
   <profile&gt;
      <id&gt;pro</id&gt;
      <properties&gt;
         <project.scope&gt;provided</project.scope&gt;
      </properties&gt;
   </profile&gt;
</profiles&gt;
<properties&gt;
   <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
   <flink.version&gt;1.9.1</flink.version&gt;
   <java.version&gt;1.8</java.version&gt;
   <scala.binary.version&gt;2.11</scala.binary.version&gt;
   <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
   <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
   <repository&gt;
      <id&gt;apache.snapshots</id&gt;
      <name&gt;Apache Development Snapshot Repository</name&gt;
      
<url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
      <releases&gt;
         <enabled&gt;false</enabled&gt;
      </releases&gt;
      <snapshots&gt;
         <enabled&gt;true</enabled&gt;
      </snapshots&gt;
   </repository&gt;
</repositories&gt;

<dependencies&gt;
   <!-- Apache Flink dependencies --&gt;
   <!-- These dependencies are provided, because they should not be packaged 
into the JAR file. --&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-java</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      
<artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-json</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-common</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-jdbc_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;



   <!--mysql--&gt;
   <dependency&gt;
      <groupId&gt;mysql</groupId&gt;
      <artifactId&gt;mysql-connector-java</artifactId&gt;
      <version&gt;5.1.48</version&gt;
   </dependency&gt;
   <!-- Gson--&gt;
   <dependency&gt;
      <groupId&gt;com.google.code.gson</groupId&gt;
      <artifactId&gt;gson</artifactId&gt;
      <version&gt;2.8.5</version&gt;
   </dependency&gt;
   <!-- Add logging framework, to produce console output when running in the 
IDE. --&gt;
   <!-- These dependencies are excluded from the application JAR by default. 
--&gt;
   <dependency&gt;
      <groupId&gt;org.slf4j</groupId&gt;
      <artifactId&gt;slf4j-log4j12</artifactId&gt;
      <version&gt;1.7.7</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;log4j</groupId&gt;
      <artifactId&gt;log4j</artifactId&gt;
      <version&gt;1.2.17</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
</dependencies&gt;

<build&gt;
   <plugins&gt;

      <!-- Java Compiler --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-compiler-plugin</artifactId&gt;
         <version&gt;3.1</version&gt;
         <configuration&gt;
            <source&gt;${java.version}</source&gt;
            <target&gt;${java.version}</target&gt;
         </configuration&gt;
      </plugin&gt;
      <!-- We use the maven-shade plugin to create a fat jar that contains all 
necessary dependencies. --&gt;
      <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program 
entry point changes. --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-shade-plugin</artifactId&gt;
         <version&gt;3.0.0</version&gt;
         <executions&gt;
            <!-- Run shade goal on package phase --&gt;
            <execution&gt;
               <phase&gt;package</phase&gt;
               <goals&gt;
                  <goal&gt;shade</goal&gt;
               </goals&gt;
               <configuration&gt;
                  <artifactSet&gt;
                     <excludes&gt;
                        <exclude&gt;org.apache.flink:force-shading</exclude&gt;
                        <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
                        <exclude&gt;org.slf4j:*</exclude&gt;
                        <exclude&gt;log4j:*</exclude&gt;
                     </excludes&gt;
                  </artifactSet&gt;
                  <filters&gt;
                     <filter&gt;
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when 
using the JAR. --&gt;
                        <artifact&gt;*:*</artifact&gt;
                        <excludes&gt;
                           <exclude&gt;META-INF/*.SF</exclude&gt;
                           <exclude&gt;META-INF/*.DSA</exclude&gt;
                           <exclude&gt;META-INF/*.RSA</exclude&gt;
                        </excludes&gt;
                     </filter&gt;
                  </filters&gt;
                  <transformers&gt;
                     <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
                        
<mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
                     </transformer&gt;
                  </transformers&gt;
               </configuration&gt;
            </execution&gt;
         </executions&gt;
      </plugin&gt;
   </plugins&gt;


</build&gt;


请问 这个应该如何解决 谢谢

回复