Never mind. Figured out. Wrong connector arguments. On Tue, May 10, 2022 at 11:19 PM Shubham Bansal < [email protected]> wrote:
> Hi Everyone, > > I am trying to fix the flink-playground for version 1.14.4 and was working > on fixing pyflink-walkthrough and I getting following error > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The following properties are requested: > connector.properties.bootstrap.servers=kafka:9092 > connector.properties.group.id=test_3 > connector.startup-mode=latest-offset > connector.topic=payment_msg > connector.type=kafka > connector.version=universal > format=json > schema.0.data-type=VARCHAR(2147483647) > schema.0.name=createTime > schema.1.data-type=BIGINT > schema.1.name=orderId > schema.2.data-type=DOUBLE > schema.2.name=payAmount > schema.3.data-type=INT > schema.3.name=payPlatform > schema.4.data-type=INT > schema.4.name=provinceId > > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:315) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:193) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:154) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:108) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41) > ... 59 more > > while executing the following DDL query > > create_kafka_source_ddl = """ > CREATE TABLE payment_msg( > createTime VARCHAR, > orderId BIGINT, > payAmount DOUBLE, > payPlatform INT, > provinceId INT > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'payment_msg', > 'connector.properties.bootstrap.servers' = 'kafka:9092', > 'connector.properties.group.id' = 'test_3', > 'connector.startup-mode' = 'latest-offset', > 'format.type' = 'json' > ) > """ > > Not sure, why its not looking for JSON parsing factory and going toward > deprecated csv parse factory. Anybody who can help with this? > I would really appreciate it. > > Thanks, > Shubham > >
