@[email protected]       
??????????????????????????????????????????????????flink??????????????????????????????????????????????????????????????????????????????????????????FLINKSQL
 ??????java??????????????????????????????
     FLINK ????:flink-1.7.2-bin-hadoop28-scala_2.11
     ????????:/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e 
/home/hadoop/flink_test/env.yaml


[hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e 
/home/hadoop/flink_test/env.yaml
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
No default environment specified.
Searching for '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/home/hadoop/flink_test/env.yaml
Validating current environment...


Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
The configured environment is invalid. Please check your environment files 
again.
        at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
        at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
        ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.


Reason: 
The matching factory 
'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't support 
'format.schema.#.type'.


Supported properties of this factory are:
connector.path
connector.path
format.comment-prefix
format.field-delimiter
format.fields.#.name
format.fields.#.type
format.ignore-first-line
format.ignore-parse-errors
format.line-delimiter
format.quote-character
schema.#.name
schema.#.type


The following properties are requested:
connector.path=/home/hadoop/flink_test/input.csv
connector.type=filesystem
format.comment-prefix=#
format.fields.0.name=MyField1
format.fields.0.type=INT
format.fields.1.name=MyField2
format.fields.1.type=VARCHAR
format.line-delimiter=\n
format.schema.0.name=MyField1
format.schema.0.type=INT
format.schema.1.name=MyField2
format.schema.1.type=VARCHAR
format.type=csv
update-mode=append


The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory


        at 
org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277)
        at 
org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136)
        at 
org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
        at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
        at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119)
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
        ... 4 more
[hadoop@server2 bin]$ 











-- ??????????????????
tables:
 - name: MyTableSource
   type: source-table
   update-mode: append
   connector:
     type: filesystem
     path: "/home/hadoop/flink_test/input.csv"
   format:
    type: csv
    fields:
        - name: MyField1
          type: INT
        - name: MyField2
          type: VARCHAR
    line-delimiter: "\n"
    comment-prefix: "#"
    schema:
        - name: MyField1
          type: INT
        - name: MyField2
          type: VARCHAR
 - name: MyCustomView
   type: view
   query: "SELECT MyField2 FROM MyTableSource"
# Execution properties allow for changing the behavior of a table program.
execution:
 type: streaming 
# required: execution mode either 'batch' or 'streaming'
 result-mode: table 
# required: either 'table' or 'changelog'
 max-table-result-rows: 1000000 
# optional: maximum number of maintained rows in table mode 1000000 by default, 
smaller 1 means unlimited
 time-characteristic: event-time 
# optional: 'processing-time' or 'event-time' (default)
 parallelism: 1 
# optional: Flink's parallelism (1 by default)
 periodic-watermarks-interval: 200 
# optional: interval for periodic watermarks(200 ms by default)
 max-parallelism: 16 
# optional: Flink's maximum parallelism (128by default)
 min-idle-state-retention: 0 
# optional: table program's minimum idle state time
 max-idle-state-retention: 0 
# optional: table program's maximum idle state time
 restart-strategy: 
# optional: restart strategy
   type: fallback 
# "fallback" to global restart strategy by default # Deployment properties 
allow for describing the cluster to which table programsare submitted to.
deployment:
  response-timeout: 5000





------------------ ???????? ------------------
??????: "????"<[email protected]>;
????????: 2019??4??1??(??????) ????3:46
??????: "user-zh"<[email protected]>;

????: ?????? ??Flink SQL??????????env.yaml



??????????


------------------ ???????? ------------------
??????: "Zhenghua Gao"<[email protected]>;
????????: 2019??4??1??(??????) ????3:40
??????: "user-zh"<[email protected]>;

????: Re: ??Flink SQL??????????env.yaml



yaml??????????????????????????????
??????????????yaml???????????????? ???? [1]
????yaml???????????????? [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM ?????? <[email protected]> wrote:

> ????????
>
>        ??????????Flink SQL 
> ??????????????????????????????????yaml????????????????????????,????????????????????????????
> hive??????????????????'\036'????????????????????
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>      - name: MyTableSource
>      ^
> expected <block end>, but found BlockMappingStart
>  in 'reader', line 17, column 3:
>       schema:
>       ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
>         at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
>         at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --????????env.yaml
> tables:
>  - name: MyTableSource
>    type: source-table
>    update-mode: append
>    connector:
>      type: filesystem
>      path: "/home/hadoop/flink_test/input.csv"
>    format:
>     type: csv
>     fields:
>         - name: MyField1
>           type: INT
>         - name: MyField2
>           type: VARCHAR
>     line-delimiter: "\n"
>     comment-prefix: "#"
>   schema:
>         - name: MyField1
>         type: INT
>         - name: MyField2
>         type: VARCHAR
>  - name: MyCustomView
>    type: view
>    query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 1000000 # optional: maximum number of maintained
> rows in
>  # 'table' mode (1000000 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>    type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>

回复