Hi Vishal

I want to understand your needs first. Your requirements are: After a
stateful operator receives a notification, it needs to traverse all the
data stored in the operator state, communicate with an external system
during the traversal process (maybe similar to join?). In order to improve
the efficiency of  this behavior, you want to take an asynchronous
approach. That is, if you modify the state of different keys, do not block
each other due to external communication.
If I understand correctly, according to the existing function of
KeyedBroadcastProcessFunction, it is really impossible.
As for whether there are other solutions, it may depend on specific
scenarios, such as what kind of external system. So could you describe in
detail what scenario has this requirement, and what are the external
systems it depends on?

Best,
Guowei


On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana <vis...@moengage.com> wrote:

> Hello,
> My application has a stateful operator which leverages RocksDB to store a
> large amount of state. It, along with other operators receive configuration
> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
> upon another input stream that triggers some communication with external
> services whose results are then combined to yield the state that gets
> stored in RocksDB.
>
> In order to make the application more efficient, I am going to switch to
> asynchronous IO but as the result is ultimately going to be a (Scala)
> Future, I will have to block once to get the result. I was hoping to
> leverage the Async IO operator but that apparently doesn't support RocksDB
> based state storage. Am I correct in saying
> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
> want to understand how registering a future's callbacks (via onComplete)
> works with a synchronous operator such as KeyedBroadcastProcessFunction.
> Will the thread executing the function simply relinquish control to some
> other subtask while the results of the external services are being awaited?
> Will the callback eventually be triggered automatically or will I have to
> explicitly block on the result future like so: Await.result(f, timeout).
>
> --
> Regards,
> Vishal
>

Reply via email to