是的,是这个问题,发现包打在胖包里面了,但是找不到,把包放在flink lib 目录下就好了,很奇怪
> 在 2019年9月11日,上午9:35,Dian Fu <[email protected]> 写道: > > 看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka > connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。 > > >> 在 2019年9月10日,下午12:31,越张 <[email protected]> 写道: >> >> 代码: >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, >> bsSettings); >> >> tableEnv.connect( new Kafka() >> .version("0.10") >> .topic("installmentdb_t_user") >> .startFromEarliest() >> .property("zookeeper.connect", "risk-kafka.aku:2181") >> .property("bootstrap.servers", "risk-kafka.aku:9092")) >> .withFormat(new Json().deriveSchema()) >> .withSchema(new Schema() >> .field("business", Types.STRING) >> .field("type", Types.STRING) >> .field("es", Types.LONG) >> ) >> .inAppendMode().registerTableSource("installmentdb_t_user"); >> >> >> >> >> Starting execution of program >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main method >> caused an error: findAndCreateTableSource failed. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) >> Caused by: org.apache.flink.table.api.TableException: >> findAndCreateTableSource failed. >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) >> at >> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) >> at feature.flinktask.sqltest.main(sqltest.java:39) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >> ... 12 more >> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could >> not find a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in >> the classpath. >> >> Reason: No context matches. >> >> The following properties are requested: >> connector.properties.0.key=zookeeper.connect >> connector.properties.0.value=risk-kafka.aku:2181 >> connector.properties.1.key=bootstrap.servers >> connector.properties.1.value=risk-kafka.aku:9092 >> connector.property-version=1 >> connector.startup-mode=earliest-offset >> connector.topic=installmentdb_t_user >> connector.type=kafka >> connector.version=0.10 >> format.derive-schema=true >> format.property-version=1 >> format.type=json >> schema.0.name=business >> schema.0.type=VARCHAR >> schema.1.name=type >> schema.1.type=VARCHAR >> schema.2.name=es >> schema.2.type=BIGINT >> update-mode=append >> >> The following factories have been considered: >> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory >> org.apache.flink.table.sources.CsvBatchTableSourceFactory >> org.apache.flink.table.sources.CsvAppendTableSourceFactory >> org.apache.flink.table.sinks.CsvBatchTableSinkFactory >> org.apache.flink.table.sinks.CsvAppendTableSinkFactory >> org.apache.flink.table.planner.StreamPlannerFactory >> org.apache.flink.table.executor.StreamExecutorFactory >> org.apache.flink.table.planner.delegation.BlinkPlannerFactory >> org.apache.flink.table.planner.delegation.BlinkExecutorFactory >> org.apache.flink.formats.json.JsonRowFormatFactory >> at >> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) >> at >> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) >> at >> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) >> at >> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) >> at >> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) >> ... 20 more >
