format 和 schema 应该在同一层。 参考一下 flink-sql-client 测试里TableNumber1的配置文件: test-sql-client-defaults.yaml
*Best Regards,* *Zhenghua Gao* On Mon, Apr 1, 2019 at 4:09 PM 曾晓勇 <[email protected]> wrote: > @[email protected] > > 谢谢,格式问题后面我检查了也已经调整正确了,直接从flink官网下载最新的版本在启动的时候报错,具体报错如下,目前想调试下能否将生产的个别脚本直接换成FLINKSQL > 而不走java编程。如果走程序调整的量很大。 > FLINK 版本:flink-1.7.2-bin-hadoop28-scala_2.11 > 启动命令:/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e > /home/hadoop/flink_test/env.yaml > > > [hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded > -e /home/hadoop/flink_test/env.yaml > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was > set. > No default environment specified. > Searching for > '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found. > Reading default environment from: > file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml > Reading session environment from: file:/home/hadoop/flink_test/env.yaml > Validating current environment... > > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: The configured > environment is invalid. Please check your environment files again. > at > org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > at > org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316) > at > org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137) > ... 2 more > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.StreamTableSourceFactory' in > the classpath. > > > Reason: > The matching factory > 'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't > support 'format.schema.#.type'. > > > Supported properties of this factory are: > connector.path > connector.path > format.comment-prefix > format.field-delimiter > format.fields.#.name > format.fields.#.type > format.ignore-first-line > format.ignore-parse-errors > format.line-delimiter > format.quote-character > schema.#.name > schema.#.type > > > The following properties are requested: > connector.path=/home/hadoop/flink_test/input.csv > connector.type=filesystem > format.comment-prefix=# > format.fields.0.name=MyField1 > format.fields.0.type=INT > format.fields.1.name=MyField2 > format.fields.1.type=VARCHAR > format.line-delimiter=\n > format.schema.0.name=MyField1 > format.schema.0.type=INT > format.schema.1.name=MyField2 > format.schema.1.type=VARCHAR > format.type=csv > update-mode=append > > > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > > > at > org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277) > at > org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136) > at > org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484) > ... 4 more > [hadoop@server2 bin]$ > > > > > > > > > > > > -- 调整后的格式问题。 > tables: > - name: MyTableSource > type: source-table > update-mode: append > connector: > type: filesystem > path: "/home/hadoop/flink_test/input.csv" > format: > type: csv > fields: > - name: MyField1 > type: INT > - name: MyField2 > type: VARCHAR > line-delimiter: "\n" > comment-prefix: "#" > schema: > - name: MyField1 > type: INT > - name: MyField2 > type: VARCHAR > - name: MyCustomView > type: view > query: "SELECT MyField2 FROM MyTableSource" > # Execution properties allow for changing the behavior of a table program. > execution: > type: streaming > # required: execution mode either 'batch' or 'streaming' > result-mode: table > # required: either 'table' or 'changelog' > max-table-result-rows: 1000000 > # optional: maximum number of maintained rows in table mode 1000000 by > default, smaller 1 means unlimited > time-characteristic: event-time > # optional: 'processing-time' or 'event-time' (default) > parallelism: 1 > # optional: Flink's parallelism (1 by default) > periodic-watermarks-interval: 200 > # optional: interval for periodic watermarks(200 ms by default) > max-parallelism: 16 > # optional: Flink's maximum parallelism (128by default) > min-idle-state-retention: 0 > # optional: table program's minimum idle state time > max-idle-state-retention: 0 > # optional: table program's maximum idle state time > restart-strategy: > # optional: restart strategy > type: fallback > # "fallback" to global restart strategy by default # Deployment properties > allow for describing the cluster to which table programsare submitted to. > deployment: > response-timeout: 5000 > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "浪人"<[email protected]>; > 发送时间: 2019年4月1日(星期一) 下午3:46 > 收件人: "user-zh"<[email protected]>; > > 主题: 回复: 【Flink SQL】无法启动env.yaml > > > > 格式没问题 > > > ------------------ 原始邮件 ------------------ > 发件人: "Zhenghua Gao"<[email protected]>; > 发送时间: 2019年4月1日(星期一) 下午3:40 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: 【Flink SQL】无法启动env.yaml > > > > yaml格式不对,看起来是缩进导致的。 > 你可以找个在线yaml编辑器验证一下, 比如 [1] > 更多yaml格式的说明,参考 [2][3] > > [1] http://nodeca.github.io/js-yaml/ > [2] http://www.ruanyifeng.com/blog/2016/07/yaml.html > [3] https://en.wikipedia.org/wiki/YAML > > *Best Regards,* > *Zhenghua Gao* > > > On Mon, Apr 1, 2019 at 11:51 AM 曾晓勇 <[email protected]> wrote: > > > 各位好, > > > > 今天在测试Flink SQL 无法启动,错误日志如下。请问下配置yaml文件的格式需要注意下什么,分割符号能否支持特殊的符号如 > > hive建表语句中的分隔符'\036',详细报错日志如下。 > > > > [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded > > -e /home/hadoop/flink_test/env.yaml > > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was > > set. > > No default environment specified. > > Searching for > > '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found. > > Reading default environment from: > > file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml > > Reading session environment from: file:/home/hadoop/flink_test/env.yaml > > > > > > Exception in thread "main" > > org.apache.flink.table.client.SqlClientException: Could not parse > > environment file. Cause: YAML decoding problem: while parsing a block > > collection > > in 'reader', line 2, column 2: > > - name: MyTableSource > > ^ > > expected <block end>, but found BlockMappingStart > > in 'reader', line 17, column 3: > > schema: > > ^ > > (through reference chain: > > org.apache.flink.table.client.config.Environment["tables"]) > > at > > > org.apache.flink.table.client.config.Environment.parse(Environment.java:146) > > at > > > org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162) > > at > org.apache.flink.table.client.SqlClient.start(SqlClient.java:90) > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:187) > > > > > > > > > > --配置文件env.yaml > > tables: > > - name: MyTableSource > > type: source-table > > update-mode: append > > connector: > > type: filesystem > > path: "/home/hadoop/flink_test/input.csv" > > format: > > type: csv > > fields: > > - name: MyField1 > > type: INT > > - name: MyField2 > > type: VARCHAR > > line-delimiter: "\n" > > comment-prefix: "#" > > schema: > > - name: MyField1 > > type: INT > > - name: MyField2 > > type: VARCHAR > > - name: MyCustomView > > type: view > > query: "SELECT MyField2 FROM MyTableSource" > > # Execution properties allow for changing the behavior of a table > program. > > execution: > > type: streaming # required: execution mode either 'batch' or 'streaming' > > result-mode: table # required: either 'table' or 'changelog' > > max-table-result-rows: 1000000 # optional: maximum number of maintained > > rows in > > # 'table' mode (1000000 by default, smaller 1 means unlimited) > > time-characteristic: event-time # optional: 'processing-time' or > > 'event-time' (default) > > parallelism: 1 # optional: Flink's parallelism (1 by default) > > periodic-watermarks-interval: 200 # optional: interval for periodic > > watermarks(200 ms by default) > > max-parallelism: 16 # optional: Flink's maximum parallelism (128by > > default) > > min-idle-state-retention: 0 # optional: table program's minimum idle > > state time > > max-idle-state-retention: 0 # optional: table program's maximum idle > > state time > > restart-strategy: # optional: restart strategy > > type: fallback # "fallback" to global restart strategy by > > default > > # Deployment properties allow for describing the cluster to which table > > programsare submitted to. > > deployment: > > response-timeout: 5000 > > > >
