Hi Sachin,

If you are using the DSL, currently there is no way to do fine-grained control 
of the downstream sending. There is some coarse-grained control in that you can 
use the record cache to dedup messages with the same key before sending 
downstream, or you can choose to get all records by setting the cache to 0:
e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 

So it looks like you might want to build such logic downstream.


> On 1 Dec 2016, at 09:19, Sachin Mittal <sjmit...@gmail.com> wrote:
> Hi all,
> Say I have a pipleline like this
> topic.aggregateByKey( ...) => to downstream
> Now for every message in topic it will call aggregateByKey and send it to
> downstream
> Is there a way to tell the pipeline that if it gets a certain message then
> only push the current aggregation result to downstream.
> Or I can do some configuration like until it has aggregated the result of n
> messages don't push it to downstream.
> Or any such logic can only be built in the downstream to check and decide
> if it needs to process the current aggregation or not.
> Thanks
> Sachin

Reply via email to