Hi,

I could not launch my flink 1.11.2 application on EMR with exception

Caused by: org.apache.flink.table.api.ValidationException:
Could not find any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

I attached the full log at the end. After checking some other threads and
none applies in my case. here is my observation:

1. dependency check: both flink-connector-kafka and flink-json are included
in the final fat jar.
2.
resources/META-INF/services/org.apache.flink.table.factories.TableFactory
has the following and is included in the final fat jar.
  - org.apache.flink.formats.json.JsonRowFormatFactory
  - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  also noticed that only identifier datagen is shown in the log. No
kafka or json in there.
3. local IntelliJ running fine.
4. same jar on EMR not working

Please advise.
Thanks,
Fanbin




Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a source for reading table
'default_catalog.default_database.analytics_service'.

Table options are:

'connector'='kafka'
'format'='json'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='localhost:9093'
'properties.group.id'='xxx'
'properties.security.protocol'='SSL'
'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
'properties.ssl.key.password'='secret'
'properties.ssl.keystore.location'='xxx.jks'
'properties.ssl.keystore.password'='secret'
'properties.ssl.keystore.type'='JKS'
'properties.ssl.truststore.location'='xxx.jks'
'properties.ssl.truststore.password'='secret'
'properties.ssl.truststore.type'='JKS'
'properties.zookeeper.connect'='localhost:2181'
'scan.startup.mode'='earliest-offset'
'topic'='events'
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
at
com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at
com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
a connector using option ''connector'='kafka''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 43 more

*Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.*
Available factory identifiers are:

*datagen*
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 44 more

Reply via email to