Hi Everyone,
I am trying to fix the flink-playground for version 1.14.4 and was working
on fixing pyflink-walkthrough and I getting following error
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=kafka:9092
connector.properties.group.id=test_3
connector.startup-mode=latest-offset
connector.topic=payment_msg
connector.type=kafka
connector.version=universal
format=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=createTime
schema.1.data-type=BIGINT
schema.1.name=orderId
schema.2.data-type=DOUBLE
schema.2.name=payAmount
schema.3.data-type=INT
schema.3.name=payPlatform
schema.4.data-type=INT
schema.4.name=provinceId
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:315)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:193)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:154)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:108)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41)
... 59 more
while executing the following DDL query
create_kafka_source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
provinceId INT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'payment_msg',
'connector.properties.bootstrap.servers' = 'kafka:9092',
'connector.properties.group.id' = 'test_3',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
Not sure, why its not looking for JSON parsing factory and going toward
deprecated csv parse factory. Anybody who can help with this?
I would really appreciate it.
Thanks,
Shubham