Given: Two streams of events, stream A and B, each stream contains events
for a given key K. A has events a little later than stream B, but no later
than 1 minute.

Question: When two PCollections are being windowed by session windows with
a gap duration of 10 minutes and CoGroupedByKey is applied to join A with B
should the session windows then overlap?

On Mon, Oct 5, 2020 at 11:10 AM Kaymak, Tobias <[email protected]>
wrote:

> Hi Reuven,
> Thank you for your response.
>
> Yes, I've tested session windows with a gap of 10 minutes as I thought
> this should work in this scenario.
> However, I found that I still had articles/assets where the watermark
> might have been wrong. As I am relying on the assets being processed first
> (inserted into BigTable, followed by a fetch from BigTable for the whole
> history) I tried the following workaround that works (ran over the weekend
> for 72 hours of testing):
>
> As I'm reading from KafkaIO I am using a custom
> .withTimestampPolicyFactory(withEventTs) for the assets, in which I am
> simply setting a timestamp that is 1 minute earlier as the element's event
> timestamp (this is my allowed gap).
> The rest of the pipeline stays as-is, so the operation and logical
> overhead is kept at a minimum.
>
> On Fri, Oct 2, 2020 at 9:40 PM Reuven Lax <[email protected]> wrote:
>
>> Have you considered using Session windows? The window would start at the
>> timestamp of the article, and the Session gap duration would be the
>> (event-time) timeout after which you stop waiting for assets to join that
>> article.
>>
>> On Fri, Oct 2, 2020 at 3:05 AM Kaymak, Tobias <[email protected]>
>> wrote:
>>
>>> Hello,
>>>
>>> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
>>> there is an example 4-6 on page 111 about custom windowing that deals with
>>> UnalignedFixedWindows:
>>>
>>> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>>>
>>> Unfortunately that example is abbreviated and the full source code is
>>> not published in this repo:
>>> https://github.com/takidau/streamingbook
>>>
>>> I am joining two Kafka Streams and I am currently windowing them by
>>> fixed time intervals. However the elements in stream one ("articles") are
>>> published first, then the assets for those articles are being published in
>>> the "assets" topic. Articles event timestamps are therefore slightly before
>>> those of assets.
>>>
>>> Now when doing a CoGroupByKey this can lead to a situation where an
>>> article is not being processed together with its assets, as
>>>
>>> - the article has a timestamp of 2020-10-02T00:30:29.997Z
>>> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>>>
>>> This is a must in my pipeline as I am relying on them to be processed
>>> together - otherwise I am publishing an article without it's assets.
>>>
>>> My idea was therefore to apply UnalignedFixedWindows instead of fixed
>>> ones to the streams to circumvent this. What I am currently missing is the
>>> mergeWindows() implementation or the full source code to understand it.
>>> I am currently facing a java.lang.IllegalStateException
>>>
>>> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
>>> time 2020-10-02T09:32:03.365Z for window
>>> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>>>
>>> Which gives me the impression that I am doing something wrong or have
>>> not fully understood the custom windowing topic.
>>>
>>> Am I on the wrong track here?
>>>
>>>
>>>
>>>

Reply via email to