大家好,我直接使用ddl定义kafka数据源出现了问题。 kafka里是logstash采上来的json格式数据。
ddl如下: CREATE TABLE vpn_source ( c_real_ip VARCHAR, d_real_ip VARCHAR, c_real_port INT, d_real_port INT, logtype INT, `user` VARCHAR, host_ip VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'vpnlog', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'flink_test', 'format.type' = 'json' ) 报错如下: Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.properties.bootstrap.servers=10.208.0.73:9092 connector.properties.group.id=flink_test connector.properties.zookeeper.connect=10.208.0.73:2181 connector.topic=vpnlog connector.type=kafka connector.version=universal format.type=json schema.0.data-type=VARCHAR(2147483647) schema.0.name=c_real_ip schema.1.data-type=VARCHAR(2147483647) schema.1.name=d_real_ip schema.2.data-type=INT schema.2.name=c_real_port schema.3.data-type=INT schema.3.name=d_real_port schema.4.data-type=INT schema.4.name=logtype schema.5.data-type=VARCHAR(2147483647) schema.5.name=user schema.6.data-type=VARCHAR(2147483647) schema.6.name=host_ip The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.filesystem.FileSystemTableFactory flink环境 本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。
