Hi Navneeth,

I didn't quite understand how async io can be used here. It would be great
> if you can share some info on it.

You need to add an async operator in the middle of your pipeline in order
to enrich your input data. [1] and [2] will help you.

Also how are you propagating any changes to values?

I need to maintain the mapping of road ID to various attributes of each
road, and the mapping is updated every week.
I use keys for versioning and I use Hash [3] for value in order to store a
mapping.
When a new mapping is prepared I'm uploading it using a fresh key while the
previous version is being served to Flink (via async io).
Such concurrent read/write is possible in Redis when you turn off
transaction when creating Redis client's pipeline.
When the new mapping is completely uploaded, I inform my Flink pipeline of
the new mapping via Kafka.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
[2] https://www.youtube.com/watch?v=UParyxe-2Wc
[3] https://redis.io/topics/data-types#hashes
[4] https://github.com/andymccurdy/redis-py#pipelines

Best,

Dongwon

On Fri, Nov 27, 2020 at 4:31 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Thanks Dongwon. It was extremely helpful. I didn't quite understand how
> async io can be used here. It would be great if you can share some info on
> it.
>
> Also how are you propagating any changes to values?
>
> Regards,
> Navneeth
>
> On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Oops, I forgot to mention that when doing bulk insert into Redis, you'd
>> better open a pipeline with a 'transaction' property set to False [1].
>>
>> Otherwise, API calls from your Flink job will be timeout.
>>
>> [1] https://github.com/andymccurdy/redis-py#pipelines
>>
>> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi Navneeth,
>>>
>>> I reported a similar issue to yours before [1] but I took the
>>> broadcasting approach at first.
>>>
>>> As you already anticipated, broadcasting is going to use more memory
>>> than your current approach based on a static object on each TM .
>>>
>>> And the broadcasted data will be treated as operator state and will be
>>> periodically checkpointed with serialization overhead & garbage collections.
>>> These are not negligible at all if you're not carefully choosing
>>> serialization strategy as explained in [2].
>>> Even with the proper one, I've experienced mild back pressure whenever
>>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing
>>> to do with operator states)
>>> - cache is being broadcasted
>>>
>>> For that reason, I decided to populate data on Redis but it also calls
>>> for design decisions:
>>> - which Java client to use? Jedis [3]? Lettuce [4]?
>>> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>>>
>>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
>>> very small memory footprint and without worrying about serialization
>>> overhead and garbage collections.
>>> Lettuce supports asynchronous communication so it works perfectly with
>>> Flink's async io.
>>> I bet you'll be very disappointed with invoking Jedis synchronously
>>> inside ProcessFunction.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
>>> [2]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>> [3] https://github.com/redis/jedis
>>> [4] https://lettuce.io/
>>> [5]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>>
>>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
>>> reachnavnee...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> We have a flink streaming job processing around 200k events per second.
>>>> The job requires a lot of less frequently changing data (sort of static but
>>>> there will be some changes over time, say 5% change once per day or so).
>>>> There are about 12 caches with some containing approximately 20k
>>>> entries whereas a few with about 2 million entries.
>>>>
>>>> In the current implementation we are using in-memory lazy loading
>>>> static cache to populate the data and the initialization happens in open
>>>> function. The reason to choose this approach is because we have allocated
>>>> around 4GB extra memory per TM for these caches and if a TM has 6 slots the
>>>> cache can be shared.
>>>>
>>>> Now the issue we have with this approach is everytime when a container
>>>> is restarted or a new job is deployed it has to populate the cache again.
>>>> Sometimes this lazy loading takes a while and it causes back pressure as
>>>> well. We were thinking to move this logic to the broadcast stream but since
>>>> the data has to be stored per slot it would increase the memory consumption
>>>> by a lot.
>>>>
>>>> Another option that we were thinking of is to replace the current near
>>>> far cache that uses rest api to load the data to redis based near far
>>>> cache. This will definitely reduce the overall loading time but still not
>>>> the perfect solution.
>>>>
>>>> Are there any recommendations on how this can be achieved effectively?
>>>> Also how is everyone overcoming this problem?
>>>>
>>>> Thanks,
>>>> Navneeth
>>>>
>>>>

Reply via email to