Hi,
I’m afraid there is currently now way to do what you want with the builtin 
window primitives. Each of the slices of the sliding windows is essentially 
evaluated independently. Therefore, there cannot be effects in one slice that 
influence processing of another slice.

What you could do is switch to tumbling windows, then each element would only 
be in one window. That probably won’t fit your use case anymore. The 
alternative I see to that is to implement everything in a custom operator where 
you deal with window states and triggering on time yourself. Let me know if you 
need some pointers about that one.

Cheers,
Aljoscha
> On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> wrote:
> 
> Hello, 
> 
> I'm trying to implement a left outer join of two Kafka streams within a 
> sliding window. So far I have the following code:
> 
> foos
>   .coGroup(bars)
>   .where(_.baz).equalTo(_.baz)
>   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, 
> TimeUnit.SECONDS)))
>   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>    fs.foreach(f =>
>     if (bs.isEmpty)
>       o.collect(FooBar(f, None))
>     else
>       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>    )
>   )
> 
> However, this results in the pair being emitted from every window slide, 
> regardless of the match. The desired behaviour would be:
> * emit the the match as soon as it's found, don't emit any more pairs for it,
> * otherwise, emit the empty match, when the left side element leaves the last 
> of its windows
> 
> What would be the idiomatic/efficient way to implement such behaviour? Is it 
> possible at all with the coGroup/window mechanism, or some other way is 
> necessary?
> 
> Alex

Reply via email to