Oops, I forgot to mention that when doing bulk insert into Redis, you'd
better open a pipeline with a 'transaction' property set to False [1].

Otherwise, API calls from your Flink job will be timeout.

[1] https://github.com/andymccurdy/redis-py#pipelines

On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Navneeth,
>
> I reported a similar issue to yours before [1] but I took the broadcasting
> approach at first.
>
> As you already anticipated, broadcasting is going to use more memory than
> your current approach based on a static object on each TM .
>
> And the broadcasted data will be treated as operator state and will be
> periodically checkpointed with serialization overhead & garbage collections.
> These are not negligible at all if you're not carefully choosing
> serialization strategy as explained in [2].
> Even with the proper one, I've experienced mild back pressure whenever
> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
> do with operator states)
> - cache is being broadcasted
>
> For that reason, I decided to populate data on Redis but it also calls for
> design decisions:
> - which Java client to use? Jedis [3]? Lettuce [4]?
> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>
> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
> very small memory footprint and without worrying about serialization
> overhead and garbage collections.
> Lettuce supports asynchronous communication so it works perfectly with
> Flink's async io.
> I bet you'll be very disappointed with invoking Jedis synchronously inside
> ProcessFunction.
>
> Best,
>
> Dongwon
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
> [2]
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> [3] https://github.com/redis/jedis
> [4] https://lettuce.io/
> [5]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> We have a flink streaming job processing around 200k events per second.
>> The job requires a lot of less frequently changing data (sort of static but
>> there will be some changes over time, say 5% change once per day or so).
>> There are about 12 caches with some containing approximately 20k
>> entries whereas a few with about 2 million entries.
>>
>> In the current implementation we are using in-memory lazy loading static
>> cache to populate the data and the initialization happens in open function.
>> The reason to choose this approach is because we have allocated around 4GB
>> extra memory per TM for these caches and if a TM has 6 slots the cache can
>> be shared.
>>
>> Now the issue we have with this approach is everytime when a container is
>> restarted or a new job is deployed it has to populate the cache again.
>> Sometimes this lazy loading takes a while and it causes back pressure as
>> well. We were thinking to move this logic to the broadcast stream but since
>> the data has to be stored per slot it would increase the memory consumption
>> by a lot.
>>
>> Another option that we were thinking of is to replace the current near
>> far cache that uses rest api to load the data to redis based near far
>> cache. This will definitely reduce the overall loading time but still not
>> the perfect solution.
>>
>> Are there any recommendations on how this can be achieved effectively?
>> Also how is everyone overcoming this problem?
>>
>> Thanks,
>> Navneeth
>>
>>

Reply via email to