Re: Synchronizing streams in coprocessfunction

2022-06-28 Thread Gopi Krishna M
Collector out) throws Exception { } }) .keyBy(Enriched::getFilePath) .map(Enriched::getData) .sink() > > > Greetings > > > > Thias > > > > > > *From:* Gopi Krishna M > *Sent:* Monday, June 27, 2022 3:01 PM > *To:*

RE: Synchronizing streams in coprocessfunction

2022-06-27 Thread Schwalbe Matthias
: user@flink.apache.org Subject: Re: Synchronizing streams in coprocessfunction Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at

Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Gopi Krishna M
Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren wrote: > Hi Gopi, > > What about using a window with a cus

Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Qingsheng Ren
Hi Gopi, What about using a window with a custom trigger? The window is doing nothing but aggregating your input to a collection. The trigger accepts metadata from the low input stream so it can fire and purge the window (emit all elements in the window to downstream) on arrival of metadata.

Synchronizing streams in coprocessfunction

2022-06-26 Thread Gopi Krishna M
Hi, I've a scenario where I use connected streams where one is a low throughput metadata stream and another one is a high throughput data stream. I use CoProcessFunction that operates on a data stream with behavior controlled by a metadata stream. Is there a way to slow down/pause the high through