》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory 这个我看了一下我先前flink1.9的工程,应用程序Jar里面也是没有这个类的,但是程序运行加载是没问题的,这么对比貌似就不是maven打包的问题了。。。。。
On Wed, Apr 22, 2020 at 7:22 PM 宇张 <[email protected]> wrote: > > 》也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory > 这个没有,只有org.apache.flink.formats.json.JsonRowFormatFactory > 》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class > 这个能拿到 > > 这么看来 貌似是 mvn打包有问题: > mvn clean package -DskipTests > 依赖范围为默认 > > > On Wed, Apr 22, 2020 at 7:05 PM Jingsong Li <[email protected]> > wrote: > >> Hi, >> >> >> 也确认下org.apache.flink.table.factories.TableFactory的内容,里面有没有KafkaTableSourceSinkFactory >> >> > 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader() >> 是的,拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class >> >> Best, >> Jingsong Lee >> >> On Wed, Apr 22, 2020 at 7:00 PM 宇张 <[email protected]> wrote: >> >> > 看下你打包的 UberJar 里有没一个内容包括 >> > 1、下面这个文件是存在的 >> > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> > 的文件 >> > META-INF/services/org.apache.flink.table.factories.TableFactory >> > 2、flink版本1.10,Standalone模式启动服务(start-cluster.sh),flink >> > run运行(/software/flink-1.10.0/bin/flink run -c com.data.main.StreamMain >> > ./flink_1.10_test-1.0-jar-with-dependencies.jar) >> > 3、再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader? >> > 这个指的是打印当前线程的classloader嘛Thread.currentThread().getContextClassLoader() >> > >> > >> > >> > On Wed, Apr 22, 2020 at 6:00 PM Jingsong Li <[email protected]> >> > wrote: >> > >> > > Hi, >> > > >> > > 先确认下你的Jar包里有没有 meta-inf-services的文件?里面确定有Kafka? >> > > >> > > 如果有,再确认下"TableEnvironmentImpl.sqlQuery"调用时候的ThreadClassLoader? >> > > 因为现在默认是通过ThreadClassLoader来获取Factory的。 >> > > >> > > Best, >> > > Jingsong Lee >> > > >> > > On Wed, Apr 22, 2020 at 5:30 PM 宇张 <[email protected]> wrote: >> > > >> > > > 我这面使用Standalone模式运行Flink任务,但是Uber >> > > > Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order: >> > > > child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber >> > > Jar里面的Factory不能被加载 >> > > > Flink Client respects Classloading Policy (FLINK-13749 >> > > > <https://issues.apache.org/jira/browse/FLINK-13749>) >> > > > < >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749 >> > > > > >> > > > >> > > > The Flink client now also respects the configured classloading >> policy, >> > > > i.e., parent-first or child-first classloading. Previously, only >> > cluster >> > > > components such as the job manager or task manager supported this >> > > setting. >> > > > This does mean that users might get different behaviour in their >> > > programs, >> > > > in which case they should configure the classloading policy >> explicitly >> > to >> > > > use parent-first classloading, which was the previous (hard-coded) >> > > > behaviour. >> > > > >> > > > 异常信息: >> > > > >> > > > rg.apache.flink.client.program.ProgramInvocationException: The >> main >> > > > method caused an error: findAndCreateTableSource failed. >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> > > > at >> > > >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> > > > at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> > > > Caused by: org.apache.flink.table.api.TableException: >> > > > findAndCreateTableSource failed. >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >> > > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> > > > >> > > > >> > > >> > >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >> > > > at com.akulaku.data.main.StreamMain.main(StreamMain.java:87) >> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > > > at >> > > > >> > > > >> > > >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > > > at >> > > > >> > > > >> > > >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > > > at java.lang.reflect.Method.invoke(Method.java:498) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> > > > ... 8 more >> > > > Caused by: >> org.apache.flink.table.api.NoMatchingTableFactoryException: >> > > > Could not find a suitable table factory for >> > > > 'org.apache.flink.table.factories.TableSourceFactory' in >> > > > the classpath. >> > > > >> > > > Reason: Required context properties mismatch. >> > > > >> > > > The matching candidates: >> > > > org.apache.flink.table.sources.CsvAppendTableSourceFactory >> > > > Mismatched properties: >> > > > 'connector.type' expects 'filesystem', but is 'kafka' >> > > > 'format.type' expects 'csv', but is 'json' >> > > > >> > > > The following properties are requested: >> > > > connector.properties.bootstrap.servers=centos:9092 >> > > > connector.properties.zookeeper.connect=centos:2181 >> > > > connector.startup-mode=earliest-offset >> > > > connector.topic=test >> > > > connector.type=kafka >> > > > connector.version=0.11 >> > > > format.type=json >> > > > schema.0.data-type=VARCHAR(2147483647) >> > > > schema.0.name=bus >> > > > schema.1.data-type=BIGINT >> > > > schema.1.name=ts >> > > > schema.2.data-type=VARCHAR(2147483647) >> > > > schema.2.name=type >> > > > schema.3.data-type=BIGINT >> > > > schema.3.name=putRowNum >> > > > schema.4.data-type=TIMESTAMP(3) NOT NULL >> > > > schema.4.expr=PROCTIME() >> > > > schema.4.name=proctime >> > > > update-mode=append >> > > > >> > > > The following factories have been considered: >> > > > org.apache.flink.table.sources.CsvBatchTableSourceFactory >> > > > org.apache.flink.table.sources.CsvAppendTableSourceFactory >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >> > > > >> > > >> > > >> > > -- >> > > Best, Jingsong Lee >> > > >> > >> >> >> -- >> Best, Jingsong Lee >> >
