[jira] [Commented] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table
[ https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358996#comment-17358996 ] Spongebob commented on FLINK-22874: --- Yes my misunderstanding of streaming partition sink cause this issue, it worked normally after I had enabled checkpointing. > flink table partition trigger doesn't effect as expectation when sink into > hive table > - > > Key: FLINK-22874 > URL: https://issues.apache.org/jira/browse/FLINK-22874 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: Spongebob >Priority: Major > > I am trying to sink into hive partitioned table which partition commit > trigger is declared as " > partition-time", and I had assigned watermark on the dataStream. When I input > some data into dataStream it can not commit hive partition on time. Here's my > code > {code:java} > //ddl of hive table > create table test_table(username string) > partitioned by (ts bigint) > stored as orc > TBLPROPERTIES ( > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.policy.kind'='metastore,success-file' > );{code} > {code:java} > // flink application code > val streamEnv = ... > val dataStream:DataStream[(String, Long)] = ... > // assign watermark and output watermark info in processFunction > class MyProcessFunction extends ProcessFunction[(String, Long), (String, > Long, Long)] { > override def processElement(value: (String, Long), ctx: > ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: > Collector[(String, Long, Long)]): Unit = { > out.collect((value._1, value._2, ctx.timerService().currentWatermark())) > } > } > val resultStream = dataStream > .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { > override def extractTimestamp(element: (String, Long), recordTimestamp: > Long): Long = { > element._2 * 1000 > } > })) > .process(new MyProcessFunction) > // > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > // convert dataStream into hive catalog table and sink into hive > streamTableEnv.createTemporaryView("test_catalog_t", resultStream) > val catalog = ... > streamTableEnv.registerCatalog("hive", catalog) > streamTableEnv.useCatalog("hive") > streamTableEnv.executeSql("insert into test_table select _1,_2 from > default_catalog.default_database.test_catalog_t").print() > // flink use the default parallelism 4 > // input data > (a, 1) > (b, 2) > (c, 3) > (d, 4) > (a, 5) > ... > // result > there are much partition directories on hdfs but all they are inprogressing > files and never would be commit to hive metastore.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table
[ https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358290#comment-17358290 ] luoyuxia commented on FLINK-22874: -- [~SpongebobZ] Hi, have you enabled checkpointing for flink will only close inprogressing files during checkpoint and commit them after completing checkpoint. > flink table partition trigger doesn't effect as expectation when sink into > hive table > - > > Key: FLINK-22874 > URL: https://issues.apache.org/jira/browse/FLINK-22874 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.13.1 >Reporter: Spongebob >Priority: Major > > I am trying to sink into hive partitioned table which partition commit > trigger is declared as " > partition-time", and I had assigned watermark on the dataStream. When I input > some data into dataStream it can not commit hive partition on time. Here's my > code > {code:java} > //ddl of hive table > create table test_table(username string) > partitioned by (ts bigint) > stored as orc > TBLPROPERTIES ( > 'sink.partition-commit.trigger'='partition-time', > 'sink.partition-commit.policy.kind'='metastore,success-file' > );{code} > {code:java} > // flink application code > val streamEnv = ... > val dataStream:DataStream[(String, Long)] = ... > // assign watermark and output watermark info in processFunction > class MyProcessFunction extends ProcessFunction[(String, Long), (String, > Long, Long)] { > override def processElement(value: (String, Long), ctx: > ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: > Collector[(String, Long, Long)]): Unit = { > out.collect((value._1, value._2, ctx.timerService().currentWatermark())) > } > } > val resultStream = dataStream > .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) > .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { > override def extractTimestamp(element: (String, Long), recordTimestamp: > Long): Long = { > element._2 * 1000 > } > })) > .process(new MyProcessFunction) > // > val streamTableEnv = buildStreamTableEnv(streamEnv, > EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) > // convert dataStream into hive catalog table and sink into hive > streamTableEnv.createTemporaryView("test_catalog_t", resultStream) > val catalog = ... > streamTableEnv.registerCatalog("hive", catalog) > streamTableEnv.useCatalog("hive") > streamTableEnv.executeSql("insert into test_table select _1,_2 from > default_catalog.default_database.test_catalog_t").print() > // flink use the default parallelism 4 > // input data > (a, 1) > (b, 2) > (c, 3) > (d, 4) > (a, 5) > ... > // result > there are much partition directories on hdfs but all they are inprogressing > files and never would be commit to hive metastore.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)