Never mind. Figured out. Wrong connector arguments.

On Tue, May 10, 2022 at 11:19 PM Shubham Bansal <
[email protected]> wrote:

> Hi Everyone,
>
> I am trying to fix the flink-playground for version 1.14.4 and was working
> on fixing pyflink-walkthrough and I getting following error
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
> connector.properties.bootstrap.servers=kafka:9092
> connector.properties.group.id=test_3
> connector.startup-mode=latest-offset
> connector.topic=payment_msg
> connector.type=kafka
> connector.version=universal
> format=json
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=createTime
> schema.1.data-type=BIGINT
> schema.1.name=orderId
> schema.2.data-type=DOUBLE
> schema.2.name=payAmount
> schema.3.data-type=INT
> schema.3.name=payPlatform
> schema.4.data-type=INT
> schema.4.name=provinceId
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:315)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:193)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:154)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:108)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41)
> ... 59 more
>
> while executing the following DDL query
>
> create_kafka_source_ddl = """
>         CREATE TABLE payment_msg(
>             createTime VARCHAR,
>             orderId BIGINT,
>             payAmount DOUBLE,
>             payPlatform INT,
>             provinceId INT
>         ) WITH (
>           'connector.type' = 'kafka',
>           'connector.version' = 'universal',
>           'connector.topic' = 'payment_msg',
>           'connector.properties.bootstrap.servers' = 'kafka:9092',
>           'connector.properties.group.id' = 'test_3',
>           'connector.startup-mode' = 'latest-offset',
>           'format.type' = 'json'
>         )
>         """
>
> Not sure, why its not looking for JSON parsing factory and going toward
> deprecated csv parse factory. Anybody who can help with this?
> I would really appreciate it.
>
> Thanks,
> Shubham
>
>

Reply via email to