Hi Gopi,

Your use case is a little under-specified to give a specific answer, especially 
to the nature of the two input streams and the way events of both streams are 
correlated (joined):

  *   Is your fast-stream keyed?
     *   If yes: keyed state and timers can be used, otherwise only operator 
state can be used to buffer events, no timers
  *   Is your metadata-stream keyed? I.e.
     *   Metadata-stream events are combined only to fast-stream events having 
the same respective key
        *   Implement a KeyedCoProcessFunction …
     *   Metadata-stream events apply to all fast-stream events irrespective of 
the key
        *   Implement a KeyedBroadcastProcessFunction (after converting the 
metadata-stream to a broadcast stream)
        *   Then in the processBroadcastElement function you can iterate over 
all keys of all state primitives
  *   None of your streams are keyed?
     *   That leaves you only the option of using operator state
        *   Current implementation of operator state is not incremental and 
thus it is completely generated/stored with each state checkpoint
        *   This allows only a moderate number of datapoints in operator state
  *   Which version of Flink are you using? Recommendations above refer to 
Flink 1.15.0

Looking forward to your answers (also please go a little more into detail of 
you use case) and follow up questions …

Greetings

Thias


From: Gopi Krishna M <gopikrish...@gmail.com>
Sent: Monday, June 27, 2022 3:01 PM
To: Qingsheng Ren <re...@apache.org>
Cc: user@flink.apache.org
Subject: Re: Synchronizing streams in coprocessfunction

Thanks Quingsheng, that would definitely work. But I'm unable to figure out how 
I can apply this with CoProcessFunction. One stream is windowed and trigger 
implementation uses the 2nd stream.

On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren 
<re...@apache.org<mailto:re...@apache.org>> wrote:
Hi Gopi,

What about using a window with a custom trigger? The window is doing nothing 
but aggregating your input to a collection. The trigger accepts metadata from 
the low input stream so it can fire and purge the window (emit all elements in 
the window to downstream) on arrival of metadata.

Best,
Qingsheng

> On Jun 27, 2022, at 12:46, Gopi Krishna M 
> <gopikrish...@gmail.com<mailto:gopikrish...@gmail.com>> wrote:
>
> Hi,
> I've a scenario where I use connected streams where one is a low throughput 
> metadata stream and another one is a high throughput data stream. I use 
> CoProcessFunction that operates on a data stream with behavior controlled by 
> a metadata stream.
>
> Is there a way to slow down/pause the high throughput data stream until I've 
> received one entry from the metadata stream? It's possible that by the time I 
> get the first element from the metadata stream, I might get 1000s of items 
> from the data stream. One option is to create a state to buffer the data 
> stream within the operator. Is there any other option which doesn't need this 
> state management?
>
> Thanks,
> Gopi
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to