[jira] [Commented] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

2021-06-07 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358996#comment-17358996
 ] 

Spongebob commented on FLINK-22874:
---

Yes my misunderstanding of streaming partition sink cause this issue, it worked 
normally after I had enabled checkpointing.

> flink table partition trigger doesn't effect as expectation when sink into 
> hive table
> -
>
> Key: FLINK-22874
> URL: https://issues.apache.org/jira/browse/FLINK-22874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: Spongebob
>Priority: Major
>
> I am trying to sink into hive partitioned table which partition commit 
> trigger is declared as "
> partition-time", and I had assigned watermark on the dataStream. When I input 
> some data into dataStream it can not commit hive partition on time. Here's my 
> code
> {code:java}
> //ddl of hive table 
> create table test_table(username string)
> partitioned by (ts bigint)
> stored as orc
> TBLPROPERTIES (
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.policy.kind'='metastore,success-file'
> );{code}
> {code:java}
> // flink application code
> val streamEnv = ...
> val dataStream:DataStream[(String, Long)] = ...
> // assign watermark and output watermark info in processFunction
> class MyProcessFunction extends ProcessFunction[(String, Long), (String, 
> Long, Long)] {
>   override def processElement(value: (String, Long), ctx: 
> ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
> Collector[(String, Long, Long)]): Unit = {
> out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
>   }
> }
> val resultStream = dataStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
> override def extractTimestamp(element: (String, Long), recordTimestamp: 
> Long): Long = {
>   element._2 * 1000
> }
>   }))
> .process(new MyProcessFunction)
> //
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> // convert dataStream into hive catalog table and sink into hive
> streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
> val catalog = ...
> streamTableEnv.registerCatalog("hive", catalog)
> streamTableEnv.useCatalog("hive")
> streamTableEnv.executeSql("insert into test_table select _1,_2 from 
> default_catalog.default_database.test_catalog_t").print()
> // flink use the default parallelism 4
> // input data
> (a, 1)
> (b, 2)
> (c, 3)
> (d, 4)
> (a, 5)
>  ...
> // result
> there are much partition directories on hdfs but all they are inprogressing 
> files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

2021-06-06 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358290#comment-17358290
 ] 

luoyuxia commented on FLINK-22874:
--

[~SpongebobZ] Hi, have you enabled checkpointing for flink will only close 
inprogressing files during checkpoint and commit  them after completing 
checkpoint.

> flink table partition trigger doesn't effect as expectation when sink into 
> hive table
> -
>
> Key: FLINK-22874
> URL: https://issues.apache.org/jira/browse/FLINK-22874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.1
>Reporter: Spongebob
>Priority: Major
>
> I am trying to sink into hive partitioned table which partition commit 
> trigger is declared as "
> partition-time", and I had assigned watermark on the dataStream. When I input 
> some data into dataStream it can not commit hive partition on time. Here's my 
> code
> {code:java}
> //ddl of hive table 
> create table test_table(username string)
> partitioned by (ts bigint)
> stored as orc
> TBLPROPERTIES (
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.policy.kind'='metastore,success-file'
> );{code}
> {code:java}
> // flink application code
> val streamEnv = ...
> val dataStream:DataStream[(String, Long)] = ...
> // assign watermark and output watermark info in processFunction
> class MyProcessFunction extends ProcessFunction[(String, Long), (String, 
> Long, Long)] {
>   override def processElement(value: (String, Long), ctx: 
> ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
> Collector[(String, Long, Long)]): Unit = {
> out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
>   }
> }
> val resultStream = dataStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
>   .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
> override def extractTimestamp(element: (String, Long), recordTimestamp: 
> Long): Long = {
>   element._2 * 1000
> }
>   }))
> .process(new MyProcessFunction)
> //
> val streamTableEnv = buildStreamTableEnv(streamEnv, 
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
> // convert dataStream into hive catalog table and sink into hive
> streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
> val catalog = ...
> streamTableEnv.registerCatalog("hive", catalog)
> streamTableEnv.useCatalog("hive")
> streamTableEnv.executeSql("insert into test_table select _1,_2 from 
> default_catalog.default_database.test_catalog_t").print()
> // flink use the default parallelism 4
> // input data
> (a, 1)
> (b, 2)
> (c, 3)
> (d, 4)
> (a, 5)
>  ...
> // result
> there are much partition directories on hdfs but all they are inprogressing 
> files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)