Thanks Reuven.
What I meant by existing windows is if earlier messages in the stream were
already assigned to their windows. What I have tried is (this is a code is
derived from waitingforcode.com)
PCollection<String> timestampedLetters =
testPipeline.apply(Create.timestamped(Arrays.asList(
TimestampedValue.of("a1", new Instant(1)),
TimestampedValue.of("a2", new Instant(1)),
TimestampedValue.of("a3", new Instant(1)),
TimestampedValue.of("b1", new Instant(2)),
TimestampedValue.of("c1", new Instant(3)),
TimestampedValue.of("d1", new Instant(4)),
TimestampedValue.of("d2", new Instant(4)),
TimestampedValue.of("a4", new Instant(1)),
TimestampedValue.of("e1", new Instant(6)),
TimestampedValue.of("e2", new Instant(8))
)));
PCollection<String> windowedLetters = timestampedLetters
.apply(Window.into(SlidingWindows.of(new Duration(2)).every(new
Duration(1))));
windowedLetters.apply(ParDo.of(new
DataPerWindowHandler(WindowHandlers.SLIDING_TIME_DUPLICATED)));
testPipeline.run().waitUntilFinish();
Map<IntervalWindow, List<String>> itemsPerWindow =
WindowHandlers.SLIDING_TIME_DUPLICATED.getItemsPerWindow();
itemsPerWindow.keySet().stream().sorted().forEach(i ->
log.info("instant = {}, values = {}", i,
list2String(itemsPerWindow.get(i))));
When the pipeline finishes this is what I see as assignments
[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.002Z), values = [a4, a1, a2, a3]
[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.003Z), values = [a2,
a4, a1, b1, a3]
[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.004Z), values = [b1, c1]
[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.005Z), values = [c1, d1, d2]
[1970-01-01T00:00:00.004Z..1970-01-01T00:00:00.006Z), values = [d1, d2]
[1970-01-01T00:00:00.006Z..1970-01-01T00:00:00.008Z), values = [e1]
[1970-01-01T00:00:00.007Z..1970-01-01T00:00:00.009Z), values = [e1, e2]
[1970-01-01T00:00:00.008Z..1970-01-01T00:00:00.010Z), values = [e2]
when the element e1 is added which has a timestamp T which cannot make it
fit in "existing windows, So it is part of only the new window [T-S, T).
If this is how windows are created then how does the "period" come into
play?
On Tue, Sep 28, 2021 at 9:41 AM Reuven Lax <[email protected]> wrote:
> Mostly accurate, but not 100%
>
> In Beam, windows don't exist a priori. There is no set of existing windows
> that an element is assigned to. A window function assigns an element to
> "windows" (where effectively a window can be considered an opaque id). Beam
> doesn't know too much about these windows, except that it is able to
> compare them for equality (to see if two elements are in the same window)
> and it knows the timestamp when the window ends.
>
> Back to your question - your assessment is mostly correct, except that the
> windows are always being created each time.
>
> Reuven
>
> On Tue, Sep 28, 2021 at 6:26 AM KV 59 <[email protected]> wrote:
>
>> Hi,
>>
>> I have a question about how windows are assigned to elements or vice
>> versa. ( I thought I understood this all this while but I'm a little
>> confused now)
>>
>> My understanding:
>> For a sliding window configuration of size S and period P
>>
>> 1. For every element E, with timestamp T it is assigned to each existing
>> window W where
>> W.start() <= T < W.end()
>> 2. And if no window exists with interval [T-S, T) then that window is
>> created and the element is part/assigned that window.
>>
>> Is this an accurate understanding?
>>
>> Appreciate your responses
>>
>> Thanks
>> Kishore
>>
>>