Hi, I checked the docs http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but did not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
Also on the first option: use the record cache to dedup messages with the same key before sending downstream I did not understand this. How does one implement this option. Thanks Sachin On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > If you are using the DSL, currently there is no way to do fine-grained > control of the downstream sending. There is some coarse-grained control in > that you can use the record cache to dedup messages with the same key > before sending downstream, or you can choose to get all records by setting > the cache to 0: > e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0); > > So it looks like you might want to build such logic downstream. > > Thanks > Eno > > > On 1 Dec 2016, at 09:19, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Hi all, > > Say I have a pipleline like this > > > > topic.aggregateByKey( ...) => to downstream > > > > Now for every message in topic it will call aggregateByKey and send it to > > downstream > > > > Is there a way to tell the pipeline that if it gets a certain message > then > > only push the current aggregation result to downstream. > > > > Or I can do some configuration like until it has aggregated the result > of n > > messages don't push it to downstream. > > > > Or any such logic can only be built in the downstream to check and decide > > if it needs to process the current aggregation or not. > > > > Thanks > > Sachin > >