>        |INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
> `p_month` = p_month)
>        |select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
> `p_month` = 4 

动态分区不是这样指定的,和hive的语法是一样的,下面两种应该都可以,flink这边文档少了点,可以参考[1][2]

INSERT INTO dwdCatalog.dwd.t1_copy 
     select id,name,`p_year`,`p_month` from dwdCatalog.dwd.t1 where `p_year` = 
2020 and `p_month` = 4 

INSERT INTO dwdCatalog.dwd.t1_copy 
    select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 4 

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples>
[2]  
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294
 
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294>



> 在 2020年5月28日,13:59,Zhou Zach <[email protected]> 写道:
> 
> 多谢指点,可以了。
> 但是换成动态插入,有问题:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
> Was expecting one of:
>    "DATE" ...
>    "FALSE" ...
>    "INTERVAL" ...
>    "NULL" ...
>    "TIME" ...
>    "TIMESTAMP" ...
>    "TRUE" ...
>    "UNKNOWN" ...
>    <UNSIGNED_INTEGER_LITERAL> ...
>    <APPROX_NUMERIC_LITERAL> ...
>    <DECIMAL_NUMERIC_LITERAL> ...
>    <BINARY_STRING_LITERAL> ...
>    <QUOTED_STRING> ...
>    <PREFIXED_STRING_LITERAL> ...
>    <UNICODE_STRING_LITERAL> ...
>    <LBRACE_D> ...
>    <LBRACE_T> ...
>    <LBRACE_TS> ...
>    "+" ...
>    "-" ...
> 
> 
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> 
> 
> 
> 
> Query:
> tableEnv.sqlUpdate(
>      """
>        |
>        |INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
> `p_month` = p_month)
>        |select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
> `p_month` = 4
>        |
>        |""".stripMargin)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-05-28 13:39:49,"Leonard Xu" <[email protected]> 写道:
>> Hi,
>>>       |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>>> = 5
>> 
>> 应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
>> 
>> 祝好,
>> Leonard Xu
>> 
>>> 在 2020年5月28日,12:57,Zhou Zach <[email protected]> 写道:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>>> caused an error: Field types of query result and registered TableSink 
>>> dwdCatalog.dwd.t1_copy do not match.
>>> 
>>> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
>>> INT NOT NULL, EXPR$5: INT NOT NULL]
>>> 
>>> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
>>> 
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>> 
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> 
>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>> 
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>> 
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> 
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> 
>>> at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>>> 
>>> at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> 
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>> 
>>> 
>>> 
>>> 
>>> hive分区表:
>>> CREATE TABLE `dwd.t1`(
>>> `id` bigint, 
>>> `name` string)
>>> PARTITIONED BY ( 
>>> `p_year` int, 
>>> `p_month` int)
>>> 
>>> 
>>> CREATE TABLE `dwd.t1_copy`(
>>> `id` bigint, 
>>> `name` string)
>>> PARTITIONED BY ( 
>>> `p_year` int, 
>>> `p_month` int)
>>> 
>>> 
>>> Flink sql:
>>> tableEnv.sqlUpdate(
>>>     """
>>>       |
>>>       |INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
>>> `p_month` = 5)
>>>       |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>>> = 5
>>>       |
>>>       |""".stripMargin)
>>> 
>>> 
>>> thanks for your help

回复