Hi Gopi, Your use case is a little under-specified to give a specific answer, especially to the nature of the two input streams and the way events of both streams are correlated (joined):
* Is your fast-stream keyed? * If yes: keyed state and timers can be used, otherwise only operator state can be used to buffer events, no timers * Is your metadata-stream keyed? I.e. * Metadata-stream events are combined only to fast-stream events having the same respective key * Implement a KeyedCoProcessFunction … * Metadata-stream events apply to all fast-stream events irrespective of the key * Implement a KeyedBroadcastProcessFunction (after converting the metadata-stream to a broadcast stream) * Then in the processBroadcastElement function you can iterate over all keys of all state primitives * None of your streams are keyed? * That leaves you only the option of using operator state * Current implementation of operator state is not incremental and thus it is completely generated/stored with each state checkpoint * This allows only a moderate number of datapoints in operator state * Which version of Flink are you using? Recommendations above refer to Flink 1.15.0 Looking forward to your answers (also please go a little more into detail of you use case) and follow up questions … Greetings Thias From: Gopi Krishna M <gopikrish...@gmail.com> Sent: Monday, June 27, 2022 3:01 PM To: Qingsheng Ren <re...@apache.org> Cc: user@flink.apache.org Subject: Re: Synchronizing streams in coprocessfunction Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren <re...@apache.org<mailto:re...@apache.org>> wrote: Hi Gopi, What about using a window with a custom trigger? The window is doing nothing but aggregating your input to a collection. The trigger accepts metadata from the low input stream so it can fire and purge the window (emit all elements in the window to downstream) on arrival of metadata. Best, Qingsheng > On Jun 27, 2022, at 12:46, Gopi Krishna M > <gopikrish...@gmail.com<mailto:gopikrish...@gmail.com>> wrote: > > Hi, > I've a scenario where I use connected streams where one is a low throughput > metadata stream and another one is a high throughput data stream. I use > CoProcessFunction that operates on a data stream with behavior controlled by > a metadata stream. > > Is there a way to slow down/pause the high throughput data stream until I've > received one entry from the metadata stream? It's possible that by the time I > get the first element from the metadata stream, I might get 1000s of items > from the data stream. One option is to create a state to buffer the data > stream within the operator. Is there any other option which doesn't need this > state management? > > Thanks, > Gopi Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.