Re: Synchronizing streams in coprocessfunction
On Mon, Jun 27, 2022 at 7:47 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi Gopi, > > > > Your use case is a little under-specified to give a specific answer, > especially to the nature of the two input streams and the way events of > both streams are correlated (joined): > >- *Is your fast-stream keyed?* > - If yes: keyed state and timers can be used, otherwise only > operator state can be used to buffer events, no timers > > Fast stream is keyed. The metadata is a broadcast. > >- >- *Is your metadata-stream keyed?* I.e. > - Metadata-stream events are combined only to fast-stream events > having the same respective key > - Implement a KeyedCoProcessFunction … > - Metadata-stream events apply to all fast-stream events > irrespective of the key > - Implement a KeyedBroadcastProcessFunction (after converting > the metadata-stream to a broadcast stream) > - Then in the processBroadcastElement function you can iterate > over all keys of all state primitives > - *None of your streams are keyed?* > - That leaves you only the option of using operator state > - Current implementation of operator state is not incremental > and thus it is completely generated/stored with each state checkpoint > - This allows only a moderate number of datapoints in operator > state > > Correct, the main concern is, there's no flow control that could be applied here. If fast stream has very high throughput, a delay in the metadata stream will cause a lot of state to be persisted. > >- > - > - > - *Which version of Flink are you using?* Recommendations above >refer to Flink 1.15.0 > > 1.14 but I can use the latest version if needed. > >- > > > > Looking forward to your answers (also please go a little more into detail > of you use case) and follow up questions … > Use case: We have a stream of events with a primary key (consider CDC). We need to store these events sorted by the primary key in a set of files (consider DeltaLake table). The flink pipeline is used to split the stream and map it to the corresponding file where the event should go. The events come in arbitrary order of primary keys and mapping of keys to files can change over time. The metadata stream is a list of key ranges regularly updated from the file system. Event stream is partitioned into multiple tasks (potentially by primary key), each task gets the metadata via broadcast connected to the keyed stream. In the CoProcess of connected stream, the file path is determined using the metadata and then the stream is keyed again using the filepath to group all events that belong to same file. Now the problem is, when the job starts, it might take some time to query the metadata. During this time, a large number of events will be buffered. Hope this clarifies the use case, if this impl can be simplified, please let me know. cdcStream .keyBy(r -> "primaryKey") .connect(tableStream.broadcast()) .process(new KeyedCoProcessFunction() { @Override public void processElement1(RowData value, KeyedCoProcessFunction.Context ctx, Collector out) throws Exception { } @Override public void processElement2(DeltaTable value, KeyedCoProcessFunction.Context ctx, Collector out) throws Exception { } }) .keyBy(Enriched::getFilePath) .map(Enriched::getData) .sink() > > > Greetings > > > > Thias > > > > > > *From:* Gopi Krishna M > *Sent:* Monday, June 27, 2022 3:01 PM > *To:* Qingsheng Ren > *Cc:* user@flink.apache.org > *Subject:* Re: Synchronizing streams in coprocessfunction > > > > Thanks Quingsheng, that would definitely work. But I'm unable to figure > out how I can apply this with CoProcessFunction. One stream is windowed and > trigger implementation uses the 2nd stream. > > > > On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren wrote: > > Hi Gopi, > > What about using a window with a custom trigger? The window is doing > nothing but aggregating your input to a collection. The trigger accepts > metadata from the low input stream so it can fire and purge the window > (emit all elements in the window to downstream) on arrival of metadata. > > Best, > Qingsheng > > > On Jun 27, 2022, at 12:46, Gopi Krishna M > wrote: > > > > Hi, > > I've a scenario where I use connected streams where one is a low > throughput metadata stream and another one is a high throughput data > stream. I use CoProcessFunction that operates on a data stream with > be
RE: Synchronizing streams in coprocessfunction
Hi Gopi, Your use case is a little under-specified to give a specific answer, especially to the nature of the two input streams and the way events of both streams are correlated (joined): * Is your fast-stream keyed? * If yes: keyed state and timers can be used, otherwise only operator state can be used to buffer events, no timers * Is your metadata-stream keyed? I.e. * Metadata-stream events are combined only to fast-stream events having the same respective key * Implement a KeyedCoProcessFunction … * Metadata-stream events apply to all fast-stream events irrespective of the key * Implement a KeyedBroadcastProcessFunction (after converting the metadata-stream to a broadcast stream) * Then in the processBroadcastElement function you can iterate over all keys of all state primitives * None of your streams are keyed? * That leaves you only the option of using operator state * Current implementation of operator state is not incremental and thus it is completely generated/stored with each state checkpoint * This allows only a moderate number of datapoints in operator state * Which version of Flink are you using? Recommendations above refer to Flink 1.15.0 Looking forward to your answers (also please go a little more into detail of you use case) and follow up questions … Greetings Thias From: Gopi Krishna M Sent: Monday, June 27, 2022 3:01 PM To: Qingsheng Ren Cc: user@flink.apache.org Subject: Re: Synchronizing streams in coprocessfunction Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren mailto:re...@apache.org>> wrote: Hi Gopi, What about using a window with a custom trigger? The window is doing nothing but aggregating your input to a collection. The trigger accepts metadata from the low input stream so it can fire and purge the window (emit all elements in the window to downstream) on arrival of metadata. Best, Qingsheng > On Jun 27, 2022, at 12:46, Gopi Krishna M > mailto:gopikrish...@gmail.com>> wrote: > > Hi, > I've a scenario where I use connected streams where one is a low throughput > metadata stream and another one is a high throughput data stream. I use > CoProcessFunction that operates on a data stream with behavior controlled by > a metadata stream. > > Is there a way to slow down/pause the high throughput data stream until I've > received one entry from the metadata stream? It's possible that by the time I > get the first element from the metadata stream, I might get 1000s of items > from the data stream. One option is to create a state to buffer the data > stream within the operator. Is there any other option which doesn't need this > state management? > > Thanks, > Gopi Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Re: Synchronizing streams in coprocessfunction
Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren wrote: > Hi Gopi, > > What about using a window with a custom trigger? The window is doing > nothing but aggregating your input to a collection. The trigger accepts > metadata from the low input stream so it can fire and purge the window > (emit all elements in the window to downstream) on arrival of metadata. > > Best, > Qingsheng > > > On Jun 27, 2022, at 12:46, Gopi Krishna M > wrote: > > > > Hi, > > I've a scenario where I use connected streams where one is a low > throughput metadata stream and another one is a high throughput data > stream. I use CoProcessFunction that operates on a data stream with > behavior controlled by a metadata stream. > > > > Is there a way to slow down/pause the high throughput data stream until > I've received one entry from the metadata stream? It's possible that by the > time I get the first element from the metadata stream, I might get 1000s of > items from the data stream. One option is to create a state to buffer the > data stream within the operator. Is there any other option which doesn't > need this state management? > > > > Thanks, > > Gopi > >
Re: Synchronizing streams in coprocessfunction
Hi Gopi, What about using a window with a custom trigger? The window is doing nothing but aggregating your input to a collection. The trigger accepts metadata from the low input stream so it can fire and purge the window (emit all elements in the window to downstream) on arrival of metadata. Best, Qingsheng > On Jun 27, 2022, at 12:46, Gopi Krishna M wrote: > > Hi, > I've a scenario where I use connected streams where one is a low throughput > metadata stream and another one is a high throughput data stream. I use > CoProcessFunction that operates on a data stream with behavior controlled by > a metadata stream. > > Is there a way to slow down/pause the high throughput data stream until I've > received one entry from the metadata stream? It's possible that by the time I > get the first element from the metadata stream, I might get 1000s of items > from the data stream. One option is to create a state to buffer the data > stream within the operator. Is there any other option which doesn't need this > state management? > > Thanks, > Gopi
Synchronizing streams in coprocessfunction
Hi, I've a scenario where I use connected streams where one is a low throughput metadata stream and another one is a high throughput data stream. I use CoProcessFunction that operates on a data stream with behavior controlled by a metadata stream. Is there a way to slow down/pause the high throughput data stream until I've received one entry from the metadata stream? It's possible that by the time I get the first element from the metadata stream, I might get 1000s of items from the data stream. One option is to create a state to buffer the data stream within the operator. Is there any other option which doesn't need this state management? Thanks, Gopi