Hi, 李轲

这是因为yml只支持1.10之前老的connector,写法是connector.type=‘filesystem’, 1.11之后的新connector都是 
connetor=‘filesystem’, 除了简化了写法外,前者的工厂方法和后者的也不一样, 所以通过yml定义的新的connector是不能被老的工厂 
SPI 发现的。而在yml中定义表从1.11开始就是不推荐了,因为已经支持了用DDL这种纯SQL的方式定义表。

推荐你可以拉起sql-client后,用DDL的方式建表

祝好
Leonard



> 在 2020年12月1日,21:43,李轲 <[email protected]> 写道:
> 
> 在服务器上试用sql-client时,启动指令如下:
> 
> ./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d 
> /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml  -e 
> /root/flink-sql-client/sql-client-demo.yml
> 
> 配置如下:
> 
> # 定义表
> tables:
>   - name: SourceTable
>     type: source-table
>     update-mode: append
>     connector:
>       type: datagen
>       rows-per-second: 5
>       fields:
>         f_sequence:
>           kind: sequence
>           start: 1
>           end: 1000
>         f_random:
>           min: 1
>           max: 1000
>         f_random_str:
>           length: 10
>     schema:
>       - name: f_sequence
>         data-type: INT
>       - name: f_random
>         data-type: INT
>       - name: f_random_str
>         data-type: STRING
> 
> 遇到了如下报错:
> 
> Reading default environment from: 
> file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml
> Reading session environment from: 
> file:/root/flink-sql-client/sql-client-demo.yml
> 
> 
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>       at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
> not create execution context.
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
>       at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
>       at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>       at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> 
> Reason: Required context properties mismatch.
> 
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Missing properties:
> format.type=csv
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'datagen'
> 
> The following properties are requested:
> connector.fields.f_random.max=1000
> connector.fields.f_random.min=1
> connector.fields.f_random_str.length=10
> connector.fields.f_sequence.end=1000
> connector.fields.f_sequence.kind=sequence
> connector.fields.f_sequence.start=1
> connector.rows-per-second=5
> connector.type=datagen
> schema.0.data-type=INT
> schema.0.name=f_sequence
> schema.1.data-type=INT
> schema.1.name=f_random
> schema.2.data-type=STRING
> schema.2.name=f_random_str
> update-mode=append
> 
> The following factories have been considered:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.filesystem.FileSystemTableFactory
>       at 
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>       at 
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>       at 
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>       at 
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638)
>       at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:183)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:136)
>       at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
>       ... 3 more
> 
> 看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 
> 也需要自己导包的么?哪里有更详细的资料,求指点,谢谢
> 
> 
>  
> 
> 
>  

回复