Oops, I forgot to mention that when doing bulk insert into Redis, you'd better open a pipeline with a 'transaction' property set to False [1].
Otherwise, API calls from your Flink job will be timeout. [1] https://github.com/andymccurdy/redis-py#pipelines On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Navneeth, > > I reported a similar issue to yours before [1] but I took the broadcasting > approach at first. > > As you already anticipated, broadcasting is going to use more memory than > your current approach based on a static object on each TM . > > And the broadcasted data will be treated as operator state and will be > periodically checkpointed with serialization overhead & garbage collections. > These are not negligible at all if you're not carefully choosing > serialization strategy as explained in [2]. > Even with the proper one, I've experienced mild back pressure whenever > - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to > do with operator states) > - cache is being broadcasted > > For that reason, I decided to populate data on Redis but it also calls for > design decisions: > - which Java client to use? Jedis [3]? Lettuce [4]? > - how to invoke APIs calls inside Flink? synchronously or asynchronously? > > Currently I'm very satisfied with Lettuce with Flink's async io [5] with > very small memory footprint and without worrying about serialization > overhead and garbage collections. > Lettuce supports asynchronous communication so it works perfectly with > Flink's async io. > I bet you'll be very disappointed with invoking Jedis synchronously inside > ProcessFunction. > > Best, > > Dongwon > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html > [2] > https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html > [3] https://github.com/redis/jedis > [4] https://lettuce.io/ > [5] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html > > On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> We have a flink streaming job processing around 200k events per second. >> The job requires a lot of less frequently changing data (sort of static but >> there will be some changes over time, say 5% change once per day or so). >> There are about 12 caches with some containing approximately 20k >> entries whereas a few with about 2 million entries. >> >> In the current implementation we are using in-memory lazy loading static >> cache to populate the data and the initialization happens in open function. >> The reason to choose this approach is because we have allocated around 4GB >> extra memory per TM for these caches and if a TM has 6 slots the cache can >> be shared. >> >> Now the issue we have with this approach is everytime when a container is >> restarted or a new job is deployed it has to populate the cache again. >> Sometimes this lazy loading takes a while and it causes back pressure as >> well. We were thinking to move this logic to the broadcast stream but since >> the data has to be stored per slot it would increase the memory consumption >> by a lot. >> >> Another option that we were thinking of is to replace the current near >> far cache that uses rest api to load the data to redis based near far >> cache. This will definitely reduce the overall loading time but still not >> the perfect solution. >> >> Are there any recommendations on how this can be achieved effectively? >> Also how is everyone overcoming this problem? >> >> Thanks, >> Navneeth >> >>