Hi,
I have a simple pipeline
stream.aggregateByKey(new Initializer<List>() {
public List apply() {
return new List
}
}, new Aggregator<Key, Value, List>() {
public List apply(key, value, list) {
list.add(value)
return list
}
}, keysSerde, valuesSerde, "table")
So this basically aggregates list of values by some key of a source stream.
This is working fine.
However over time the list will grow very big, so I thought of using
windowed table.
stream.aggregateByKey(new Initializer<List>() {
public List apply() {
return new List
}
}, new Aggregator<Key, Value, List>() {
public List apply(key, value, list) {
list.add(value)
return list
}
}, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
valuesSerde)
It is basically the above code, but what I find is that it aggregates only
one value for a given windowed key.
So size of list is always one.
What I understood is that it will put the source values in a time bucket
based on their timestamp extractor. When i check the timed window I see
that value's timestamp between the bounds of time window.
However I have not understood that why it is aggregating only a single
value always.
So to downstream I always get something like
(key, start, end) -> [value1]
(key, start, end) -> [value2]
and not
(key, start, end) -> [value1, value2]
note both value1 and value2 are between the start and end bonds.
However in first case I get this
key -> [value1, value2] which is what I expect.
So please let me know if I am missing something in my windowed aggregation.
Or if there is something else to be done to get the output I want.
Thanks
Sachin