Re: Synchronizing streams in coprocessfunction

2022-06-28 Thread Gopi Krishna M
On Mon, Jun 27, 2022 at 7:47 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Gopi,
>
>
>
> Your use case is a little under-specified to give a specific answer,
> especially to the nature of the two input streams and the way events of
> both streams are correlated (joined):
>
>- *Is your fast-stream keyed?*
>   - If yes: keyed state and timers can be used, otherwise only
>   operator state can be used to buffer events, no timers
>
> Fast stream is keyed. The metadata is a broadcast.

>
>-
>- *Is your metadata-stream keyed?* I.e.
>   - Metadata-stream events are combined only to fast-stream events
>   having the same respective key
>  - Implement a KeyedCoProcessFunction …
>   - Metadata-stream events apply to all fast-stream events
>   irrespective of the key
>  - Implement a KeyedBroadcastProcessFunction (after converting
>  the metadata-stream to a broadcast stream)
>  - Then in the processBroadcastElement function you can iterate
>  over all keys of all state primitives
>   - *None of your streams are keyed?*
>   - That leaves you only the option of using operator state
>  - Current implementation of operator state is not incremental
>  and thus it is completely generated/stored with each state checkpoint
>  - This allows only a moderate number of datapoints in operator
>  state
>
> Correct, the main concern is, there's no flow control that could be
applied here. If fast stream has very high throughput, a delay in the
metadata stream will cause a lot of state to be persisted.

>
>-
>   -
>  -
>   - *Which version of Flink are you using?* Recommendations above
>refer to Flink 1.15.0
>
> 1.14 but I can use the latest version if needed.

>
>-
>
>
>
> Looking forward to your answers (also please go a little more into detail
> of you use case) and follow up questions …
>

Use case: We have a stream of events with a primary key (consider CDC). We
need to store these events sorted by the primary key in a set of files
(consider DeltaLake table). The flink pipeline is used to split the stream
and map it to the corresponding file where the event should go. The events
come in arbitrary order of primary keys and mapping of keys to files can
change over time.

The metadata stream is a list of key ranges regularly updated from the file
system. Event stream is partitioned into multiple tasks (potentially by
primary key), each task gets the metadata via broadcast connected to the
keyed stream. In the CoProcess of connected stream, the file path is
determined using the metadata and then the stream is keyed again using the
filepath to group all events that belong to same file.

Now the problem is, when the job starts, it might take some time to query
the metadata. During this time, a large number of events will be buffered.

Hope this clarifies the use case, if this impl can be simplified, please
let me know.

cdcStream
.keyBy(r -> "primaryKey")
.connect(tableStream.broadcast())
.process(new KeyedCoProcessFunction() {
@Override
public void processElement1(RowData value,
KeyedCoProcessFunction.Context ctx,
Collector out) throws Exception {

}

@Override
public void processElement2(DeltaTable value,
KeyedCoProcessFunction.Context ctx,
Collector out) throws Exception {

}
})
.keyBy(Enriched::getFilePath)
.map(Enriched::getData)
.sink()



>
>
> Greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Gopi Krishna M 
> *Sent:* Monday, June 27, 2022 3:01 PM
> *To:* Qingsheng Ren 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Synchronizing streams in coprocessfunction
>
>
>
> Thanks Quingsheng, that would definitely work. But I'm unable to figure
> out how I can apply this with CoProcessFunction. One stream is windowed and
> trigger implementation uses the 2nd stream.
>
>
>
> On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren  wrote:
>
> Hi Gopi,
>
> What about using a window with a custom trigger? The window is doing
> nothing but aggregating your input to a collection. The trigger accepts
> metadata from the low input stream so it can fire and purge the window
> (emit all elements in the window to downstream) on arrival of metadata.
>
> Best,
> Qingsheng
>
> > On Jun 27, 2022, at 12:46, Gopi Krishna M 
> wrote:
> >
> > Hi,
> > I've a scenario where I use connected streams where one is a low
> throughput metadata stream and another one is a high throughput data
> stream. I use CoProcessFunction that operates on a data stream with
> be

RE: Synchronizing streams in coprocessfunction

2022-06-27 Thread Schwalbe Matthias
Hi Gopi,

Your use case is a little under-specified to give a specific answer, especially 
to the nature of the two input streams and the way events of both streams are 
correlated (joined):

  *   Is your fast-stream keyed?
 *   If yes: keyed state and timers can be used, otherwise only operator 
state can be used to buffer events, no timers
  *   Is your metadata-stream keyed? I.e.
 *   Metadata-stream events are combined only to fast-stream events having 
the same respective key
*   Implement a KeyedCoProcessFunction …
 *   Metadata-stream events apply to all fast-stream events irrespective of 
the key
*   Implement a KeyedBroadcastProcessFunction (after converting the 
metadata-stream to a broadcast stream)
*   Then in the processBroadcastElement function you can iterate over 
all keys of all state primitives
  *   None of your streams are keyed?
 *   That leaves you only the option of using operator state
*   Current implementation of operator state is not incremental and 
thus it is completely generated/stored with each state checkpoint
*   This allows only a moderate number of datapoints in operator state
  *   Which version of Flink are you using? Recommendations above refer to 
Flink 1.15.0

Looking forward to your answers (also please go a little more into detail of 
you use case) and follow up questions …

Greetings

Thias


From: Gopi Krishna M 
Sent: Monday, June 27, 2022 3:01 PM
To: Qingsheng Ren 
Cc: user@flink.apache.org
Subject: Re: Synchronizing streams in coprocessfunction

Thanks Quingsheng, that would definitely work. But I'm unable to figure out how 
I can apply this with CoProcessFunction. One stream is windowed and trigger 
implementation uses the 2nd stream.

On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren 
mailto:re...@apache.org>> wrote:
Hi Gopi,

What about using a window with a custom trigger? The window is doing nothing 
but aggregating your input to a collection. The trigger accepts metadata from 
the low input stream so it can fire and purge the window (emit all elements in 
the window to downstream) on arrival of metadata.

Best,
Qingsheng

> On Jun 27, 2022, at 12:46, Gopi Krishna M 
> mailto:gopikrish...@gmail.com>> wrote:
>
> Hi,
> I've a scenario where I use connected streams where one is a low throughput 
> metadata stream and another one is a high throughput data stream. I use 
> CoProcessFunction that operates on a data stream with behavior controlled by 
> a metadata stream.
>
> Is there a way to slow down/pause the high throughput data stream until I've 
> received one entry from the metadata stream? It's possible that by the time I 
> get the first element from the metadata stream, I might get 1000s of items 
> from the data stream. One option is to create a state to buffer the data 
> stream within the operator. Is there any other option which doesn't need this 
> state management?
>
> Thanks,
> Gopi
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Gopi Krishna M
Thanks Quingsheng, that would definitely work. But I'm unable to figure out
how I can apply this with CoProcessFunction. One stream is windowed and
trigger implementation uses the 2nd stream.

On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren  wrote:

> Hi Gopi,
>
> What about using a window with a custom trigger? The window is doing
> nothing but aggregating your input to a collection. The trigger accepts
> metadata from the low input stream so it can fire and purge the window
> (emit all elements in the window to downstream) on arrival of metadata.
>
> Best,
> Qingsheng
>
> > On Jun 27, 2022, at 12:46, Gopi Krishna M 
> wrote:
> >
> > Hi,
> > I've a scenario where I use connected streams where one is a low
> throughput metadata stream and another one is a high throughput data
> stream. I use CoProcessFunction that operates on a data stream with
> behavior controlled by a metadata stream.
> >
> > Is there a way to slow down/pause the high throughput data stream until
> I've received one entry from the metadata stream? It's possible that by the
> time I get the first element from the metadata stream, I might get 1000s of
> items from the data stream. One option is to create a state to buffer the
> data stream within the operator. Is there any other option which doesn't
> need this state management?
> >
> > Thanks,
> > Gopi
>
>


Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Qingsheng Ren
Hi Gopi,

What about using a window with a custom trigger? The window is doing nothing 
but aggregating your input to a collection. The trigger accepts metadata from 
the low input stream so it can fire and purge the window (emit all elements in 
the window to downstream) on arrival of metadata. 

Best,
Qingsheng

> On Jun 27, 2022, at 12:46, Gopi Krishna M  wrote:
> 
> Hi,
> I've a scenario where I use connected streams where one is a low throughput 
> metadata stream and another one is a high throughput data stream. I use 
> CoProcessFunction that operates on a data stream with behavior controlled by 
> a metadata stream.
> 
> Is there a way to slow down/pause the high throughput data stream until I've 
> received one entry from the metadata stream? It's possible that by the time I 
> get the first element from the metadata stream, I might get 1000s of items 
> from the data stream. One option is to create a state to buffer the data 
> stream within the operator. Is there any other option which doesn't need this 
> state management?
> 
> Thanks,
> Gopi