all those are verified.

the issue is fixed by adding
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
to org.apache.flink.table.factories.Factory.

Thanks,
Fanbin



On Tue, Nov 17, 2020 at 7:29 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Please verify that:
> 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf
> your-program.jar | grep KafkaDynamicTableFactory")
> 2. kafka-connector version matches the version of Flink distribution on
> EMR.
>
> Regards,
> Roman
>
>
> On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> Hi,
>>
>> I could not launch my flink 1.11.2 application on EMR with exception
>>
>> Caused by: org.apache.flink.table.api.ValidationException:
>> Could not find any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>>
>> I attached the full log at the end. After checking some other threads and
>> none applies in my case. here is my observation:
>>
>> 1. dependency check: both flink-connector-kafka and flink-json are
>> included in the final fat jar.
>> 2.
>> resources/META-INF/services/org.apache.flink.table.factories.TableFactory
>> has the following and is included in the final fat jar.
>>   - org.apache.flink.formats.json.JsonRowFormatFactory
>>   -
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>   also noticed that only identifier datagen is shown in the log. No
>> kafka or json in there.
>> 3. local IntelliJ running fine.
>> 4. same jar on EMR not working
>>
>> Please advise.
>> Thanks,
>> Fanbin
>>
>>
>>
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>> create a source for reading table
>> 'default_catalog.default_database.analytics_service'.
>>
>> Table options are:
>>
>> 'connector'='kafka'
>> 'format'='json'
>> 'json.ignore-parse-errors'='true'
>> 'properties.bootstrap.servers'='localhost:9093'
>> 'properties.group.id'='xxx'
>> 'properties.security.protocol'='SSL'
>> 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
>> 'properties.ssl.key.password'='secret'
>> 'properties.ssl.keystore.location'='xxx.jks'
>> 'properties.ssl.keystore.password'='secret'
>> 'properties.ssl.keystore.type'='JKS'
>> 'properties.ssl.truststore.location'='xxx.jks'
>> 'properties.ssl.truststore.password'='secret'
>> 'properties.ssl.truststore.type'='JKS'
>> 'properties.zookeeper.connect'='localhost:2181'
>> 'scan.startup.mode'='earliest-offset'
>> 'topic'='events'
>> at
>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>> at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
>> at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
>> at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
>> at
>> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
>> at
>> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ... 11 more
>> Caused by: org.apache.flink.table.api.ValidationException: Cannot
>> discover a connector using option ''connector'='kafka''.
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>> at
>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
>> ... 43 more
>>
>> *Caused by: org.apache.flink.table.api.ValidationException: Could not
>> find any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.*
>> Available factory identifiers are:
>>
>> *datagen*
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 44 more
>>
>>
>>
>>

Reply via email to