> |INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year,
> `p_month` = p_month)
> |select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and
> `p_month` = 4
动态分区不是这样指定的,和hive的语法是一样的,下面两种应该都可以,flink这边文档少了点,可以参考[1][2]
INSERT INTO dwdCatalog.dwd.t1_copy
select id,name,`p_year`,`p_month` from dwdCatalog.dwd.t1 where `p_year` =
2020 and `p_month` = 4
INSERT INTO dwdCatalog.dwd.t1_copy
select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 4
Best,
Leonard Xu
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples>
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294>
> 在 2020年5月28日,13:59,Zhou Zach <[email protected]> 写道:
>
> 多谢指点,可以了。
> 但是换成动态插入,有问题:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
> Was expecting one of:
> "DATE" ...
> "FALSE" ...
> "INTERVAL" ...
> "NULL" ...
> "TIME" ...
> "TIMESTAMP" ...
> "TRUE" ...
> "UNKNOWN" ...
> <UNSIGNED_INTEGER_LITERAL> ...
> <APPROX_NUMERIC_LITERAL> ...
> <DECIMAL_NUMERIC_LITERAL> ...
> <BINARY_STRING_LITERAL> ...
> <QUOTED_STRING> ...
> <PREFIXED_STRING_LITERAL> ...
> <UNICODE_STRING_LITERAL> ...
> <LBRACE_D> ...
> <LBRACE_T> ...
> <LBRACE_TS> ...
> "+" ...
> "-" ...
>
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
>
>
>
> Query:
> tableEnv.sqlUpdate(
> """
> |
> |INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year,
> `p_month` = p_month)
> |select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and
> `p_month` = 4
> |
> |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-28 13:39:49,"Leonard Xu" <[email protected]> 写道:
>> Hi,
>>> |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month`
>>> = 5
>>
>> 应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
>>
>> 祝好,
>> Leonard Xu
>>
>>> 在 2020年5月28日,12:57,Zhou Zach <[email protected]> 写道:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main method
>>> caused an error: Field types of query result and registered TableSink
>>> dwdCatalog.dwd.t1_copy do not match.
>>>
>>> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4:
>>> INT NOT NULL, EXPR$5: INT NOT NULL]
>>>
>>> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>
>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>>>
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>
>>>
>>>
>>>
>>> hive分区表:
>>> CREATE TABLE `dwd.t1`(
>>> `id` bigint,
>>> `name` string)
>>> PARTITIONED BY (
>>> `p_year` int,
>>> `p_month` int)
>>>
>>>
>>> CREATE TABLE `dwd.t1_copy`(
>>> `id` bigint,
>>> `name` string)
>>> PARTITIONED BY (
>>> `p_year` int,
>>> `p_month` int)
>>>
>>>
>>> Flink sql:
>>> tableEnv.sqlUpdate(
>>> """
>>> |
>>> |INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020,
>>> `p_month` = 5)
>>> |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month`
>>> = 5
>>> |
>>> |""".stripMargin)
>>>
>>>
>>> thanks for your help