看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。
> 在 2019年9月10日,下午12:31,越张 <[email protected]> 写道: > > 代码: > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, > bsSettings); > > tableEnv.connect( new Kafka() > .version("0.10") > .topic("installmentdb_t_user") > .startFromEarliest() > .property("zookeeper.connect", "risk-kafka.aku:2181") > .property("bootstrap.servers", "risk-kafka.aku:9092")) > .withFormat(new Json().deriveSchema()) > .withSchema(new Schema() > .field("business", Types.STRING) > .field("type", Types.STRING) > .field("es", Types.LONG) > ) > .inAppendMode().registerTableSource("installmentdb_t_user"); > > > > > Starting execution of program > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: findAndCreateTableSource failed. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) > at feature.flinktask.sqltest.main(sqltest.java:39) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > ... 12 more > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: No context matches. > > The following properties are requested: > connector.properties.0.key=zookeeper.connect > connector.properties.0.value=risk-kafka.aku:2181 > connector.properties.1.key=bootstrap.servers > connector.properties.1.value=risk-kafka.aku:9092 > connector.property-version=1 > connector.startup-mode=earliest-offset > connector.topic=installmentdb_t_user > connector.type=kafka > connector.version=0.10 > format.derive-schema=true > format.property-version=1 > format.type=json > schema.0.name=business > schema.0.type=VARCHAR > schema.1.name=type > schema.1.type=VARCHAR > schema.2.name=es > schema.2.type=BIGINT > update-mode=append > > The following factories have been considered: > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > org.apache.flink.table.planner.StreamPlannerFactory > org.apache.flink.table.executor.StreamExecutorFactory > org.apache.flink.table.planner.delegation.BlinkPlannerFactory > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > org.apache.flink.formats.json.JsonRowFormatFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) > ... 20 more
