Thanks Reuven.

What I meant by existing windows is if earlier messages in the stream were
already assigned to their windows. What I have tried is (this is a code is
derived from waitingforcode.com)

PCollection<String> timestampedLetters =
testPipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("a1", new Instant(1)),
TimestampedValue.of("a2", new Instant(1)),
    TimestampedValue.of("a3", new Instant(1)),
TimestampedValue.of("b1", new Instant(2)),
    TimestampedValue.of("c1", new Instant(3)),
TimestampedValue.of("d1", new Instant(4)),
    TimestampedValue.of("d2", new Instant(4)),
TimestampedValue.of("a4", new Instant(1)),
    TimestampedValue.of("e1", new Instant(6)),
TimestampedValue.of("e2", new Instant(8))
)));



PCollection<String> windowedLetters = timestampedLetters
    .apply(Window.into(SlidingWindows.of(new Duration(2)).every(new
Duration(1))));
windowedLetters.apply(ParDo.of(new
DataPerWindowHandler(WindowHandlers.SLIDING_TIME_DUPLICATED)));

testPipeline.run().waitUntilFinish();

Map<IntervalWindow, List<String>> itemsPerWindow =
WindowHandlers.SLIDING_TIME_DUPLICATED.getItemsPerWindow();
itemsPerWindow.keySet().stream().sorted().forEach(i ->
log.info("instant = {}, values = {}", i,
list2String(itemsPerWindow.get(i))));


When the pipeline finishes this is what I see as assignments

[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.002Z), values = [a4, a1, a2, a3]
[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.003Z), values = [a2,
a4, a1, b1, a3]
[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.004Z), values = [b1, c1]
[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.005Z), values = [c1, d1, d2]
[1970-01-01T00:00:00.004Z..1970-01-01T00:00:00.006Z), values = [d1, d2]
[1970-01-01T00:00:00.006Z..1970-01-01T00:00:00.008Z), values = [e1]
[1970-01-01T00:00:00.007Z..1970-01-01T00:00:00.009Z), values = [e1, e2]
[1970-01-01T00:00:00.008Z..1970-01-01T00:00:00.010Z), values = [e2]

when the element e1 is added which has a timestamp T which cannot make it
fit in "existing windows,  So it is part of only the new window [T-S, T).

If this is how windows are created then how does the "period" come into
play?




On Tue, Sep 28, 2021 at 9:41 AM Reuven Lax <[email protected]> wrote:

> Mostly accurate, but not 100%
>
> In Beam, windows don't exist a priori. There is no set of existing windows
> that an element is assigned to. A window function assigns an element to
> "windows" (where effectively a window can be considered an opaque id). Beam
> doesn't know too much about these windows, except that it is able to
> compare them for equality (to see if two elements are in the same window)
> and it knows the timestamp when the window ends.
>
> Back to your question - your assessment is mostly correct, except that the
> windows are always being created each time.
>
> Reuven
>
> On Tue, Sep 28, 2021 at 6:26 AM KV 59 <[email protected]> wrote:
>
>> Hi,
>>
>> I have a question about how windows are assigned to elements or vice
>> versa. ( I thought I understood this all this while but I'm a little
>> confused now)
>>
>> My understanding:
>> For a sliding window configuration of size S and period P
>>
>> 1. For every element E, with timestamp T it is assigned to each existing
>> window W where
>>     W.start() <= T < W.end()
>> 2. And if no window exists with interval [T-S, T) then that window is
>> created and the element is part/assigned that  window.
>>
>> Is this an accurate understanding?
>>
>> Appreciate your responses
>>
>> Thanks
>> Kishore
>>
>>

Reply via email to