是的,是这个问题,发现包打在胖包里面了,但是找不到,把包放在flink lib 目录下就好了,很奇怪

> 在 2019年9月11日,上午9:35,Dian Fu <[email protected]> 写道:
> 
> 看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka 
> connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。
> 
> 
>> 在 2019年9月10日,下午12:31,越张 <[email protected]> 写道:
>> 
>> 代码:
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamExecutionEnvironment streamEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>> bsSettings);
>> 
>> tableEnv.connect(    new Kafka()
>>       .version("0.10")
>>       .topic("installmentdb_t_user")
>>       .startFromEarliest()
>>       .property("zookeeper.connect", "risk-kafka.aku:2181")
>>       .property("bootstrap.servers", "risk-kafka.aku:9092"))
>>       .withFormat(new Json().deriveSchema())
>>       .withSchema(new Schema()
>>               .field("business", Types.STRING)
>>               .field("type", Types.STRING)
>>               .field("es", Types.LONG)
>>       )
>>       .inAppendMode().registerTableSource("installmentdb_t_user");
>> 
>> 
>> 
>> 
>> Starting execution of program
>> 
>> ------------------------------------------------------------
>> The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: findAndCreateTableSource failed.
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>      at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>      at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>      at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>      at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>      at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>      at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>      at java.security.AccessController.doPrivileged(Native Method)
>>      at javax.security.auth.Subject.doAs(Subject.java:422)
>>      at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>>      at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.flink.table.api.TableException: 
>> findAndCreateTableSource failed.
>>      at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>>      at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>>      at 
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>>      at feature.flinktask.sqltest.main(sqltest.java:39)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>      ... 12 more
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
>> not find a suitable table factory for 
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> Reason: No context matches.
>> 
>> The following properties are requested:
>> connector.properties.0.key=zookeeper.connect
>> connector.properties.0.value=risk-kafka.aku:2181
>> connector.properties.1.key=bootstrap.servers
>> connector.properties.1.value=risk-kafka.aku:9092
>> connector.property-version=1
>> connector.startup-mode=earliest-offset
>> connector.topic=installmentdb_t_user
>> connector.type=kafka
>> connector.version=0.10
>> format.derive-schema=true
>> format.property-version=1
>> format.type=json
>> schema.0.name=business
>> schema.0.type=VARCHAR
>> schema.1.name=type
>> schema.1.type=VARCHAR
>> schema.2.name=es
>> schema.2.type=BIGINT
>> update-mode=append
>> 
>> The following factories have been considered:
>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> org.apache.flink.table.planner.StreamPlannerFactory
>> org.apache.flink.table.executor.StreamExecutorFactory
>> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
>> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>> org.apache.flink.formats.json.JsonRowFormatFactory
>>      at 
>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>>      at 
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>>      at 
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>>      at 
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
>>      at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
>>      ... 20 more
> 

回复