Hi Vishal If your scenario is to update the data in full every time. One idea is to rerun the job every time. For example, you have an `EnrichWithDatabaseAndWebSerivce` job, which is responsible for reading all data from a data source every time, and then joins the data with DB and Web services. Every time you need to re-enrich you have to start the job again.
Also, can you briefly describe what the frequency is? Best, Guowei On Fri, Apr 29, 2022 at 2:20 PM Vishal Surana <vis...@moengage.com> wrote: > Yes. You have explained my requirements exactly as they are. My operator > will talk to multiple databases and a couple of web services to enrich > incoming input streams. I cannot think of a way to use the async IO > operator. So I thought maybe convert these 7-10 calls into async calls and > chain the Futures together. I believe I have to block once in the end of > the KeyedBroadcastProcessFunction but if there's a way to avoid that also > while also ensuring ordered processing of events, then do let me know. > > On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi Vishal >> >> I want to understand your needs first. Your requirements are: After a >> stateful operator receives a notification, it needs to traverse all the >> data stored in the operator state, communicate with an external system >> during the traversal process (maybe similar to join?). In order to improve >> the efficiency of this behavior, you want to take an asynchronous >> approach. That is, if you modify the state of different keys, do not block >> each other due to external communication. >> If I understand correctly, according to the existing function of >> KeyedBroadcastProcessFunction, it is really impossible. >> As for whether there are other solutions, it may depend on specific >> scenarios, such as what kind of external system. So could you describe in >> detail what scenario has this requirement, and what are the external >> systems it depends on? >> >> Best, >> Guowei >> >> >> On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana <vis...@moengage.com> >> wrote: >> >>> Hello, >>> My application has a stateful operator which leverages RocksDB to store >>> a large amount of state. It, along with other operators receive >>> configuration as a broadcast stream (KeyedBroadcastProcessFunction). The >>> operator depends upon another input stream that triggers some communication >>> with external services whose results are then combined to yield the state >>> that gets stored in RocksDB. >>> >>> In order to make the application more efficient, I am going to switch to >>> asynchronous IO but as the result is ultimately going to be a (Scala) >>> Future, I will have to block once to get the result. I was hoping to >>> leverage the Async IO operator but that apparently doesn't support RocksDB >>> based state storage. Am I correct in saying >>> that KeyedBroadcastProcessFunction is the only option I have? If so, then I >>> want to understand how registering a future's callbacks (via onComplete) >>> works with a synchronous operator such as KeyedBroadcastProcessFunction. >>> Will the thread executing the function simply relinquish control to some >>> other subtask while the results of the external services are being awaited? >>> Will the callback eventually be triggered automatically or will I have to >>> explicitly block on the result future like so: Await.result(f, timeout). >>> >>> -- >>> Regards, >>> Vishal >>> >> > > -- > Regards, > Vishal >