Hi, I have a simple pipeline stream.aggregateByKey(new Initializer<List>() { public List apply() { return new List } }, new Aggregator<Key, Value, List>() { public List apply(key, value, list) { list.add(value) return list } }, keysSerde, valuesSerde, "table")
So this basically aggregates list of values by some key of a source stream. This is working fine. However over time the list will grow very big, so I thought of using windowed table. stream.aggregateByKey(new Initializer<List>() { public List apply() { return new List } }, new Aggregator<Key, Value, List>() { public List apply(key, value, list) { list.add(value) return list } }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde, valuesSerde) It is basically the above code, but what I find is that it aggregates only one value for a given windowed key. So size of list is always one. What I understood is that it will put the source values in a time bucket based on their timestamp extractor. When i check the timed window I see that value's timestamp between the bounds of time window. However I have not understood that why it is aggregating only a single value always. So to downstream I always get something like (key, start, end) -> [value1] (key, start, end) -> [value2] and not (key, start, end) -> [value1, value2] note both value1 and value2 are between the start and end bonds. However in first case I get this key -> [value1, value2] which is what I expect. So please let me know if I am missing something in my windowed aggregation. Or if there is something else to be done to get the output I want. Thanks Sachin