???? lib ???????????????? ????????



------------------ ???????? ------------------
??????:&nbsp;"AS"<allensu...@163.com&gt;;
????????:&nbsp;2020??1??15??(??????) ????4:19
??????:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

????:&nbsp;????????????:flink ????kafka source ????????????



Hi:
&nbsp;&nbsp;&nbsp; ??????????????????????, ????????kafka??factory????????.
&nbsp;&nbsp;&nbsp; ?? 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
 ????jar????flink??lib??????(????????????????????) ??????. 
&nbsp;&nbsp;&nbsp; ??????????.






??2020??01??15?? 14:59??Others<41486...@qq.com&gt; ??????
????????flink ???? ??1.9.1
??????????????????????????????????????
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler&nbsp;&nbsp;&nbsp; - 
Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
java.lang.reflect.Method.invoke(Method.java:498)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;... 17 more


??????pom????


<name&amp;gt;Flink Quickstart Job</name&amp;gt;
<url&amp;gt;http://www.myorganization.org</url&amp;gt;
<profiles&amp;gt;
<profile&amp;gt;
<id&amp;gt;dev</id&amp;gt;
<activation&amp;gt;
<activeByDefault&amp;gt;true</activeByDefault&amp;gt;
</activation&amp;gt;
<properties&amp;gt;
<project.scope&amp;gt;compile</project.scope&amp;gt;
</properties&amp;gt;
</profile&amp;gt;
<profile&amp;gt;
<id&amp;gt;pro</id&amp;gt;
<properties&amp;gt;
<project.scope&amp;gt;provided</project.scope&amp;gt;
</properties&amp;gt;
</profile&amp;gt;
</profiles&amp;gt;
<properties&amp;gt;
<project.build.sourceEncoding&amp;gt;UTF-8</project.build.sourceEncoding&amp;gt;
<flink.version&amp;gt;1.9.1</flink.version&amp;gt;
<java.version&amp;gt;1.8</java.version&amp;gt;
<scala.binary.version&amp;gt;2.11</scala.binary.version&amp;gt;
<maven.compiler.source&amp;gt;${java.version}</maven.compiler.source&amp;gt;
<maven.compiler.target&amp;gt;${java.version}</maven.compiler.target&amp;gt;
</properties&amp;gt;

<repositories&amp;gt;
<repository&amp;gt;
<id&amp;gt;apache.snapshots</id&amp;gt;
<name&amp;gt;Apache Development Snapshot Repository</name&amp;gt;
<url&amp;gt;https://repository.apache.org/content/repositories/snapshots/</url&amp;gt;
<releases&amp;gt;
<enabled&amp;gt;false</enabled&amp;gt;
</releases&amp;gt;
<snapshots&amp;gt;
<enabled&amp;gt;true</enabled&amp;gt;
</snapshots&amp;gt;
</repository&amp;gt;
</repositories&amp;gt;

<dependencies&amp;gt;
<!-- Apache Flink dependencies --&amp;gt;
<!-- These dependencies are provided, because they should not be packaged into 
the JAR file. --&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-java</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
<scope&amp;gt;${project.scope}</scope&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-streaming-java_${scala.binary.version}</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
<scope&amp;gt;${project.scope}</scope&amp;gt;
</dependency&amp;gt;

<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-api-java-bridge_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-planner-blink_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;

<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-connector-kafka_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-json</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-common</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;

<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-jdbc_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;



<!--mysql--&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;mysql</groupId&amp;gt;
<artifactId&amp;gt;mysql-connector-java</artifactId&amp;gt;
<version&amp;gt;5.1.48</version&amp;gt;
</dependency&amp;gt;
<!-- Gson--&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;com.google.code.gson</groupId&amp;gt;
<artifactId&amp;gt;gson</artifactId&amp;gt;
<version&amp;gt;2.8.5</version&amp;gt;
</dependency&amp;gt;
<!-- Add logging framework, to produce console output when running in the IDE. 
--&amp;gt;
<!-- These dependencies are excluded from the application JAR by default. 
--&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.slf4j</groupId&amp;gt;
<artifactId&amp;gt;slf4j-log4j12</artifactId&amp;gt;
<version&amp;gt;1.7.7</version&amp;gt;
<scope&amp;gt;runtime</scope&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;log4j</groupId&amp;gt;
<artifactId&amp;gt;log4j</artifactId&amp;gt;
<version&amp;gt;1.2.17</version&amp;gt;
<scope&amp;gt;runtime</scope&amp;gt;
</dependency&amp;gt;
</dependencies&amp;gt;

<build&amp;gt;
<plugins&amp;gt;

<!-- Java Compiler --&amp;gt;
<plugin&amp;gt;
<groupId&amp;gt;org.apache.maven.plugins</groupId&amp;gt;
<artifactId&amp;gt;maven-compiler-plugin</artifactId&amp;gt;
<version&amp;gt;3.1</version&amp;gt;
<configuration&amp;gt;
<source&amp;gt;${java.version}</source&amp;gt;
<target&amp;gt;${java.version}</target&amp;gt;
</configuration&amp;gt;
</plugin&amp;gt;
<!-- We use the maven-shade plugin to create a fat jar that contains all 
necessary dependencies. --&amp;gt;
<!-- Change the value of <mainClass&amp;gt;...</mainClass&amp;gt; if your 
program entry point changes. --&amp;gt;
<plugin&amp;gt;
<groupId&amp;gt;org.apache.maven.plugins</groupId&amp;gt;
<artifactId&amp;gt;maven-shade-plugin</artifactId&amp;gt;
<version&amp;gt;3.0.0</version&amp;gt;
<executions&amp;gt;
<!-- Run shade goal on package phase --&amp;gt;
<execution&amp;gt;
<phase&amp;gt;package</phase&amp;gt;
<goals&amp;gt;
<goal&amp;gt;shade</goal&amp;gt;
</goals&amp;gt;
<configuration&amp;gt;
<artifactSet&amp;gt;
<excludes&amp;gt;
<exclude&amp;gt;org.apache.flink:force-shading</exclude&amp;gt;
<exclude&amp;gt;com.google.code.findbugs:jsr305</exclude&amp;gt;
<exclude&amp;gt;org.slf4j:*</exclude&amp;gt;
<exclude&amp;gt;log4j:*</exclude&amp;gt;
</excludes&amp;gt;
</artifactSet&amp;gt;
<filters&amp;gt;
<filter&amp;gt;
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --&amp;gt;
<artifact&amp;gt;*:*</artifact&amp;gt;
<excludes&amp;gt;
<exclude&amp;gt;META-INF/*.SF</exclude&amp;gt;
<exclude&amp;gt;META-INF/*.DSA</exclude&amp;gt;
<exclude&amp;gt;META-INF/*.RSA</exclude&amp;gt;
</excludes&amp;gt;
</filter&amp;gt;
</filters&amp;gt;
<transformers&amp;gt;
<transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&amp;gt;
<mainClass&amp;gt;com.doumob.flink.BuoyDataJob</mainClass&amp;gt;
</transformer&amp;gt;
</transformers&amp;gt;
</configuration&amp;gt;
</execution&amp;gt;
</executions&amp;gt;
</plugin&amp;gt;
</plugins&amp;gt;


</build&amp;gt;


???? ???????????????? ????

回复