I checked the docs
http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but did
not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
Also on the first option:
use the record cache to dedup messages with the same key before sending
I did not understand this. How does one implement this option.
On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska <eno.there...@gmail.com> wrote:
> Hi Sachin,
> If you are using the DSL, currently there is no way to do fine-grained
> control of the downstream sending. There is some coarse-grained control in
> that you can use the record cache to dedup messages with the same key
> before sending downstream, or you can choose to get all records by setting
> the cache to 0:
> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> So it looks like you might want to build such logic downstream.
> > On 1 Dec 2016, at 09:19, Sachin Mittal <sjmit...@gmail.com> wrote:
> > Hi all,
> > Say I have a pipleline like this
> > topic.aggregateByKey( ...) => to downstream
> > Now for every message in topic it will call aggregateByKey and send it to
> > downstream
> > Is there a way to tell the pipeline that if it gets a certain message
> > only push the current aggregation result to downstream.
> > Or I can do some configuration like until it has aggregated the result
> of n
> > messages don't push it to downstream.
> > Or any such logic can only be built in the downstream to check and decide
> > if it needs to process the current aggregation or not.
> > Thanks
> > Sachin