Hi, I’m afraid there is currently now way to do what you want with the builtin window primitives. Each of the slices of the sliding windows is essentially evaluated independently. Therefore, there cannot be effects in one slice that influence processing of another slice.
What you could do is switch to tumbling windows, then each element would only be in one window. That probably won’t fit your use case anymore. The alternative I see to that is to implement everything in a custom operator where you deal with window states and triggering on time yourself. Let me know if you need some pointers about that one. Cheers, Aljoscha > On 26 Jan 2016, at 19:32, Alexander Gryzlov <alex.gryz...@gmail.com> wrote: > > Hello, > > I'm trying to implement a left outer join of two Kafka streams within a > sliding window. So far I have the following code: > > foos > .coGroup(bars) > .where(_.baz).equalTo(_.baz) > .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, > TimeUnit.SECONDS))) > .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) => > fs.foreach(f => > if (bs.isEmpty) > o.collect(FooBar(f, None)) > else > bs.foreach(b => o.collect(FooBar(f, Some(b)))) > ) > ) > > However, this results in the pair being emitted from every window slide, > regardless of the match. The desired behaviour would be: > * emit the the match as soon as it's found, don't emit any more pairs for it, > * otherwise, emit the empty match, when the left side element leaves the last > of its windows > > What would be the idiomatic/efficient way to implement such behaviour? Is it > possible at all with the coGroup/window mechanism, or some other way is > necessary? > > Alex