Hi,
> 对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成 > `flink-sql-connector-kafka-0.11` > 才可以运行,这两个有什么区别,如果不一样的话,对于 table&SQL API 最好标明一下用后者 flink-connector-kafka_2.11 是dataStream API编程使用的 flink-sql-connector-kafka-0.11_2.11 是 Table API & SQL 编程使用的,其中0.11是kafka版本,2.11是scala版本 如果是Table API & SQL程序不用加 flink-connector-kafka_2.11 的依赖,你的case把dataStream的connector依赖去掉, 把 sql connector的依赖改为 flink-sql-connector-kafka-0.11_2.11 试下 Best, Leonard Xu > > macia kk <pre...@gmail.com> 于2020年5月25日周一 上午10:05写道: > >> built.sbt >> >> val flinkVersion = "1.10.0" >> libraryDependencies ++= Seq( >> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion , >> "org.apache.flink" %% "flink-scala" % flinkVersion, >> "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion, >> >> "org.apache.flink" % "flink-table-common" % flinkVersion, >> "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, >> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, >> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % >> "provided", >> >> "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, >> "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % flinkVersion, >> // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11 >> "org.apache.flink" % "flink-json" % flinkVersion >> ) >> >> >> Leonard Xu <xbjt...@gmail.com> 于2020年5月25日周一 上午9:33写道: >> >>> Hi, >>> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 >>> >>> Best, >>> Leonard Xu >>> >>>> 在 2020年5月25日,02:44,macia kk <pre...@gmail.com> 写道: >>>> >>>> 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久: >>>> >>>> Table API, sink to Kafka >>>> >>>> val result = bsTableEnv.sqlQuery("SELECT * FROM " + "pppppppp") >>>> >>>> bsTableEnv >>>> .connect( >>>> new Kafka() >>>> .version("0.11") // required: valid connector versions are >>>> .topic("aaa") // required: topic name from which the table is >>> read >>>> .property("zookeeper.connect", "xxx") >>>> .property("bootstrap.servers", "yyy") >>>> ) >>>> .withFormat(new Json()) >>>> .withSchema(new Schema() >>>> .field("ts", INT()) >>>> .field("table", STRING()) >>>> .field("database", STRING()) >>>> ) >>>> .createTemporaryTable("zzzzz") >>>> >>>> result.insertInto("mmmmm") >>>> >>>> Error: >>>> >>>> java.lang.NoSuchMethodError: >>>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.<init>(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V >>>> at >>> org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58) >>>> at >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95) >>>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140) >>>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>> at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>> at >>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) >>>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) >>>> at >>> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) >>>> at >>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74) >>>> at >>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30) >>>> at >>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >>>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >>>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982) >>>> at >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >>>> >>>> >>>> 麻烦帮我看下,谢谢 >>>> >>>> Lijie Wang <wangliji...@126.com> 于2020年5月25日周一 上午12:34写道: >>>> >>>>> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with >>> 属性是否正确。 >>>>> >>>>> >>>>> >>>>> 在 2020-05-25 00:11:16,"macia kk" <pre...@gmail.com> 写道: >>>>> >>>>> 有人帮我看下这个问题吗,谢谢 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error: findAndCreateTableSource failed. >>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>> Could not find a suitable table factory for >>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>> the classpath. >>>>> Reason: Required context properties mismatch. >>>>> >>>>> The matching candidates: >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> Mismatched properties: >>>>> 'connector.type' expects 'filesystem', but is 'kafka' >>>>> 'format.type' expects 'csv', but is 'json' >>>>> >>>>> The following properties are requested: >>>>> connector.properties.bootstrap.servers=ip-10-128- >>>>> 145-1.idata-server.shopee.io:9092connector.properties.group.id >>>>> =keystats_aripay >>>>> connector.property-version=1 >>>>> connector.startup-mode=latest-offset >>>>> connector.topic=ebisu_wallet_id_db_mirror_v1 >>>>> connector.type=kafka >>>>> format.property-version=1 >>>>> format.type=json >>>>> schema.0.data-type=INT >>>>> schema.0.name=ts >>>>> schema.1.data-type=VARCHAR(2147483647) >>>>> schema.1.name=table >>>>> schema.2.data-type=VARCHAR(2147483647) >>>>> schema.2.name=database >>>>> update-mode=append >>>>> >>>>> The following factories have been considered: >>>>> org.apache.flink.table.sources.CsvBatchTableSourceFactory >>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) >>>>> at >>>>> >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) >>>>> ... 39 more >>>>> >>> >>>