Hi Yik San,

Maybe you could use watermark to trigger the last flush. Source operations will 
emit MAX_WATERMARK to trigger all the timers when it terminates (see [1]).

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

Best,
Paul Lam

> 2021年6月30日 10:38,Yik San Chan <evan.chanyik...@gmail.com> 写道:
> 
> Hi community,
> 
> I have a batch job that consumes records from a bounded source (e.g., Hive), 
> walk them through a BufferingSink as described in 
> [docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction>).
>  In the BufferingSink, I want to flush out records to the sink in 1000-record 
> batches.
> 
> Given the source is bounded, I will need to flush out all records when it 
> comes to the end, otherwise records buffered (variable bufferedElements) will 
> be lost.
> 
> An obvious way of doing so is to flush out all records in the `close` method. 
> That should work fine.
> 
> However, I wonder if it's possible to tell if a record is the last record in 
> the `invoke` method? In other words, how to implement the `isLastRecord` 
> method below?
> 
> ```java
> @Override public void invoke(Tuple2<String, Integer> value, Context context) 
> throws Exception {
>     bufferedElements.add(value);
>     if (bufferedElements.size() == threshold || isLastRecord()) {
>         for (Tuple2<String, Integer> element: bufferedElements) { 
>             // send it to the sink 
>         } 
>         bufferedElements.clear(); 
>     } 
> }
> ```
> 
> Thanks!
> 
> Best,
> Yik San

Reply via email to