虽然你放到 lib 下就能行了听起来是个 BUG,能不能说明一下你的 Flink 版本还有具体的启动命令。
FLINK-13749 可能在早期版本上没有,另外 Standalone 的类加载如果是 PerJob 有更改过。 Best, tison. tison <[email protected]> 于2020年4月22日周三 下午5:48写道: > 看下你打包的 UberJar 里有没一个内容包括 > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > > 的文件 > > META-INF/services/org.apache.flink.table.factories.TableFactory > > Best, > tison. > > > 宇张 <[email protected]> 于2020年4月22日周三 下午5:30写道: > >> 我这面使用Standalone模式运行Flink任务,但是Uber >> Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order: >> child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber >> Jar里面的Factory不能被加载 >> Flink Client respects Classloading Policy (FLINK-13749 >> <https://issues.apache.org/jira/browse/FLINK-13749>) >> < >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749 >> > >> >> The Flink client now also respects the configured classloading policy, >> i.e., parent-first or child-first classloading. Previously, only cluster >> components such as the job manager or task manager supported this setting. >> This does mean that users might get different behaviour in their programs, >> in which case they should configure the classloading policy explicitly to >> use parent-first classloading, which was the previous (hard-coded) >> behaviour. >> >> 异常信息: >> >> rg.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: findAndCreateTableSource failed. >> at >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> at >> >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> at >> >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at >> >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: org.apache.flink.table.api.TableException: >> findAndCreateTableSource failed. >> at >> >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) >> at >> >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) >> at >> >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) >> at >> >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) >> at >> >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) >> at >> >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >> at >> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >> at >> >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) >> at com.akulaku.data.main.StreamMain.main(StreamMain.java:87) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> ... 8 more >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >> Could not find a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> >> Reason: Required context properties mismatch. >> >> The matching candidates: >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> Mismatched properties: >> 'connector.type' expects 'filesystem', but is 'kafka' >> 'format.type' expects 'csv', but is 'json' >> >> The following properties are requested: >> connector.properties.bootstrap.servers=centos:9092 >> connector.properties.zookeeper.connect=centos:2181 >> connector.startup-mode=earliest-offset >> connector.topic=test >> connector.type=kafka >> connector.version=0.11 >> format.type=json >> schema.0.data-type=VARCHAR(2147483647) >> schema.0.name=bus >> schema.1.data-type=BIGINT >> schema.1.name=ts >> schema.2.data-type=VARCHAR(2147483647) >> schema.2.name=type >> schema.3.data-type=BIGINT >> schema.3.name=putRowNum >> schema.4.data-type=TIMESTAMP(3) NOT NULL >> schema.4.expr=PROCTIME() >> schema.4.name=proctime >> update-mode=append >> >> The following factories have been considered: >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> at >> >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >> at >> >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >> at >> >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >> at >> >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >> at >> >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >> >
