Hi Piotr, Yah thanks a lot for your help. For future reference, what I did was simply:
1. Copy the whole BufferingSink as in docs https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction 2. In its `invoke` method, I batch write everything in bufferedElements into Redis using Redis pipelining. That's pretty much it. Thank you for your help again! On Thu, Jul 8, 2021 at 11:09 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Great, thanks for coming back and I'm glad that it works for you! > > Piotrek > > czw., 8 lip 2021 o 13:34 Yik San Chan <evan.chanyik...@gmail.com> > napisał(a): > >> Hi Piotr, >> >> Thanks! I end up doing option 1, and that works great. >> >> Best, >> Yik San >> >> On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi, >>> >>> You could always buffer records in your sink function/operator, until a >>> large enough batch is accumulated and upload the whole batch at once. Note >>> that if you want to have at-least-once or exactly-once semantics, you would >>> need to take care of those buffered records in one way or another. For >>> example you could: >>> 1. Buffer records on some in memory data structure (not Flink's state), >>> and just make sure that those records are flushed to the underlying sink on >>> `CheckpointedFunction#snapshotState()` calls >>> 2. Buffer records on Flink's state (heap state backend or rocksdb - heap >>> state backend would be the fastest with little overhead, but you can risk >>> running out of memory), and that would easily give you exactly-once. That >>> way your batch could span multiple checkpoints. >>> 3. Buffer/write records to temporary files, but in that case keep in >>> mind that those files need to be persisted and recovered in case of failure >>> and restart. >>> 4. Ignore checkpointing and either always restart the job from scratch >>> or accept some occasional data loss. >>> >>> FYI, virtually every connector/sink is internally batching writes to >>> some extent. Usually by doing option 1. >>> >>> Piotrek >>> >>> wt., 25 maj 2021 o 14:50 Yik San Chan <evan.chanyik...@gmail.com> >>> napisał(a): >>> >>>> Hi community, >>>> >>>> I have a Hive table that stores tens of millions rows of data. In my >>>> Flink job, I want to process the data in batch manner: >>>> >>>> - Split the data into batches, each batch has (maybe) 10,000 rows. >>>> - For each batch, call a batchPut() API on my redis client to dump in >>>> Redis. >>>> >>>> Doing so in a streaming manner is not expected, as that will cause too >>>> many round trips between Flink workers and Redis. >>>> >>>> Is there a way to do that? I find little clue in Flink docs, since >>>> almost all APIs feel better suited for streaming processing by default. >>>> >>>> Thank you! >>>> >>>> Best, >>>> Yik San >>>> >>>