有人帮我看下这个问题吗,谢谢

[image: image.png]
[image: image.png]

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.properties.bootstrap.servers=ip-10-128-145-1.idata-server.shopee.io:9092connector.properties.group.id=keystats_aripay
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=ebisu_wallet_id_db_mirror_v1
connector.type=kafka
format.property-version=1
format.type=json
schema.0.data-type=INTschema.0.name=ts
schema.1.data-type=VARCHAR(2147483647)schema.1.name=table
schema.2.data-type=VARCHAR(2147483647)schema.2.name=database
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
    at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
    at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
    at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
    at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
    at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
    ... 39 more

回复