Hi,
I have a simple pipeline
stream.aggregateByKey(new Initializer<List>() {
    public List apply() {
    return new List
    }
}, new Aggregator<Key, Value, List>() {
    public List apply(key, value, list) {
    list.add(value)
    return list
    }
}, keysSerde, valuesSerde, "table")

So this basically aggregates list of values by some key of a source stream.
This is working fine.

However over time the list will grow very big, so I thought of using
windowed table.

stream.aggregateByKey(new Initializer<List>() {
    public List apply() {
    return new List
    }
}, new Aggregator<Key, Value, List>() {
    public List apply(key, value, list) {
    list.add(value)
    return list
    }
}, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
valuesSerde)

It is basically the above code, but what I find is that it aggregates only
one value for a given windowed key.
So size of list is always one.

What I understood is that it will put the source values in a time bucket
based on their timestamp extractor. When i check the timed window I see
that value's timestamp between the bounds of time window.

However I have not understood that why it is aggregating only a single
value always.

So to downstream I always get something like

(key, start, end) -> [value1]
(key, start, end) -> [value2]
and not
(key, start, end) -> [value1, value2]
note both value1 and value2 are between the start and end bonds.

However in first case I get this
key -> [value1, value2] which is what I expect.

So please let me know if I am missing something in my windowed aggregation.

Or if there is something else to be done to get the output I want.

Thanks
Sachin

Reply via email to