Well. Of course this is not fixing the core problem. What I can do is extend the FixedWindows class and make sure that for my real recorded "system latency" the values still get put into the previous window. Or is there a smarter way to deal with this?
On Fri, Oct 2, 2020 at 4:11 PM Kaymak, Tobias <[email protected]> wrote: > This is what I came up with: > > https://gist.github.com/tkaymak/1f5eccf8633c18ab7f46f8ad01527630 > > The first run looks okay (in my use case size and offset are the same), > but I will need to add tests to prove my understanding of this. > > On Fri, Oct 2, 2020 at 12:05 PM Kaymak, Tobias <[email protected]> > wrote: > >> Hello, >> >> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven >> there is an example 4-6 on page 111 about custom windowing that deals with >> UnalignedFixedWindows: >> >> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html >> >> Unfortunately that example is abbreviated and the full source code is not >> published in this repo: >> https://github.com/takidau/streamingbook >> >> I am joining two Kafka Streams and I am currently windowing them by fixed >> time intervals. However the elements in stream one ("articles") are >> published first, then the assets for those articles are being published in >> the "assets" topic. Articles event timestamps are therefore slightly before >> those of assets. >> >> Now when doing a CoGroupByKey this can lead to a situation where an >> article is not being processed together with its assets, as >> >> - the article has a timestamp of 2020-10-02T00:30:29.997Z >> - the assets have a timestamp of 2020-10-02T00:30:30.001Z >> >> This is a must in my pipeline as I am relying on them to be processed >> together - otherwise I am publishing an article without it's assets. >> >> My idea was therefore to apply UnalignedFixedWindows instead of fixed >> ones to the streams to circumvent this. What I am currently missing is the >> mergeWindows() implementation or the full source code to understand it. >> I am currently facing a java.lang.IllegalStateException >> >> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier >> time 2020-10-02T09:32:03.365Z for window >> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z) >> >> Which gives me the impression that I am doing something wrong or have not >> fully understood the custom windowing topic. >> >> Am I on the wrong track here? >> >> >> >>
