Hi Yik San, Maybe you could use watermark to trigger the last flush. Source operations will emit MAX_WATERMARK to trigger all the timers when it terminates (see [1]).
[1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java Best, Paul Lam > 2021年6月30日 10:38,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > Hi community, > > I have a batch job that consumes records from a bounded source (e.g., Hive), > walk them through a BufferingSink as described in > [docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction > > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction>). > In the BufferingSink, I want to flush out records to the sink in 1000-record > batches. > > Given the source is bounded, I will need to flush out all records when it > comes to the end, otherwise records buffered (variable bufferedElements) will > be lost. > > An obvious way of doing so is to flush out all records in the `close` method. > That should work fine. > > However, I wonder if it's possible to tell if a record is the last record in > the `invoke` method? In other words, how to implement the `isLastRecord` > method below? > > ```java > @Override public void invoke(Tuple2<String, Integer> value, Context context) > throws Exception { > bufferedElements.add(value); > if (bufferedElements.size() == threshold || isLastRecord()) { > for (Tuple2<String, Integer> element: bufferedElements) { > // send it to the sink > } > bufferedElements.clear(); > } > } > ``` > > Thanks! > > Best, > Yik San