all those are verified. the issue is fixed by adding org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory to org.apache.flink.table.factories.Factory.
Thanks, Fanbin On Tue, Nov 17, 2020 at 7:29 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi, > > Please verify that: > 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf > your-program.jar | grep KafkaDynamicTableFactory") > 2. kafka-connector version matches the version of Flink distribution on > EMR. > > Regards, > Roman > > > On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> Hi, >> >> I could not launch my flink 1.11.2 application on EMR with exception >> >> Caused by: org.apache.flink.table.api.ValidationException: >> Could not find any factory for identifier 'kafka' that implements >> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the >> classpath. >> >> I attached the full log at the end. After checking some other threads and >> none applies in my case. here is my observation: >> >> 1. dependency check: both flink-connector-kafka and flink-json are >> included in the final fat jar. >> 2. >> resources/META-INF/services/org.apache.flink.table.factories.TableFactory >> has the following and is included in the final fat jar. >> - org.apache.flink.formats.json.JsonRowFormatFactory >> - >> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory >> also noticed that only identifier datagen is shown in the log. No >> kafka or json in there. >> 3. local IntelliJ running fine. >> 4. same jar on EMR not working >> >> Please advise. >> Thanks, >> Fanbin >> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to >> create a source for reading table >> 'default_catalog.default_database.analytics_service'. >> >> Table options are: >> >> 'connector'='kafka' >> 'format'='json' >> 'json.ignore-parse-errors'='true' >> 'properties.bootstrap.servers'='localhost:9093' >> 'properties.group.id'='xxx' >> 'properties.security.protocol'='SSL' >> 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1' >> 'properties.ssl.key.password'='secret' >> 'properties.ssl.keystore.location'='xxx.jks' >> 'properties.ssl.keystore.password'='secret' >> 'properties.ssl.keystore.type'='JKS' >> 'properties.ssl.truststore.location'='xxx.jks' >> 'properties.ssl.truststore.password'='secret' >> 'properties.ssl.truststore.type'='JKS' >> 'properties.zookeeper.connect'='localhost:2181' >> 'scan.startup.mode'='earliest-offset' >> 'topic'='events' >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) >> at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133) >> at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36) >> at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30) >> at >> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76) >> at >> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> ... 11 more >> Caused by: org.apache.flink.table.api.ValidationException: Cannot >> discover a connector using option ''connector'='kafka''. >> at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) >> ... 43 more >> >> *Caused by: org.apache.flink.table.api.ValidationException: Could not >> find any factory for identifier 'kafka' that implements >> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the >> classpath.* >> Available factory identifiers are: >> >> *datagen* >> at >> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >> at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >> ... 44 more >> >> >> >>