Hi Piotr,

Yah thanks a lot for your help. For future reference, what I did was simply:

1. Copy the whole BufferingSink as in docs
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
2. In its `invoke` method, I batch write everything in bufferedElements
into Redis using Redis pipelining.

That's pretty much it. Thank you for your help again!


On Thu, Jul 8, 2021 at 11:09 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Great, thanks for coming back and I'm glad that it works for you!
>
> Piotrek
>
> czw., 8 lip 2021 o 13:34 Yik San Chan <evan.chanyik...@gmail.com>
> napisał(a):
>
>> Hi Piotr,
>>
>> Thanks! I end up doing option 1, and that works great.
>>
>> Best,
>> Yik San
>>
>> On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> You could always buffer records in your sink function/operator, until a
>>> large enough batch is accumulated and upload the whole batch at once. Note
>>> that if you want to have at-least-once or exactly-once semantics, you would
>>> need to take care of those buffered records in one way or another. For
>>> example you could:
>>> 1. Buffer records on some in memory data structure (not Flink's state),
>>> and just make sure that those records are flushed to the underlying sink on
>>> `CheckpointedFunction#snapshotState()` calls
>>> 2. Buffer records on Flink's state (heap state backend or rocksdb - heap
>>> state backend would be the fastest with little overhead, but you can risk
>>> running out of memory), and that would easily give you exactly-once. That
>>> way your batch could span multiple checkpoints.
>>> 3. Buffer/write records to temporary files, but in that case keep in
>>> mind that those files need to be persisted and recovered in case of failure
>>> and restart.
>>> 4. Ignore checkpointing and either always restart the job from scratch
>>> or accept some occasional data loss.
>>>
>>> FYI, virtually every connector/sink is internally batching writes to
>>> some extent. Usually by doing option 1.
>>>
>>> Piotrek
>>>
>>> wt., 25 maj 2021 o 14:50 Yik San Chan <evan.chanyik...@gmail.com>
>>> napisał(a):
>>>
>>>> Hi community,
>>>>
>>>> I have a Hive table that stores tens of millions rows of data. In my
>>>> Flink job, I want to process the data in batch manner:
>>>>
>>>> - Split the data into batches, each batch has (maybe) 10,000 rows.
>>>> - For each batch, call a batchPut() API on my redis client to dump in
>>>> Redis.
>>>>
>>>> Doing so in a streaming manner is not expected, as that will cause too
>>>> many round trips between Flink workers and Redis.
>>>>
>>>> Is there a way to do that? I find little clue in Flink docs, since
>>>> almost all APIs feel better suited for streaming processing by default.
>>>>
>>>> Thank you!
>>>>
>>>> Best,
>>>> Yik San
>>>>
>>>

Reply via email to