Hi Sachin, You can achieve what you want by setting the correct cleanup.policy on these topics. In this case you want cleanup.policy=compact,delete - you'll also want to set retention.ms and/or retention.bytes.
The topic will then be compacted, but it will also delete any segments based on the retention settings. You can look here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist for further details. HTH, Damian On Thu, 10 Nov 2016 at 11:17 Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > As per Eno suggestion I have pre-created internal changelog topics with > increased max.message.bytes config to handle big messages that gets > incremented over the time. > > As Matthias has pointed that we cannot use retention.ms setting to delete > older message data after a given time, is there a way to purge older > messages from my changelog topic. > > Remember my changelog topic is > key=list of objects > and this grows with time. > > So I would like these to be deleted from time to time because I would have > already consumed the objects so that key/value can be deleted. > Later if I get a new object for the same key then that's a new message and > old data has no use for the streaming application. > > So how can I achieve the following. > Would retention.bytes help here? > > Is there a way if i can set expire after or something like that at message > level and some kafka thread would purge those messages. > > Thanks > Sachin > > > > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > -----BEGIN PGP SIGNED MESSAGE----- > > Hash: SHA512 > > > > My two cents: > > > > Changelog topics are compacted topics, thus they do not have a > > retention time (there is an exception for windowed KTable changlog > > topics that are compacted and do have a retention time though). > > > > However, I do not understand how changing retention time should fix > > the issue. If your list of values grows and exceed max.message.byte > > you will need to increase this parameter (or shrink you value). > > > > Besides this, Eno's answer is the way to go. In order to figure out > > internal topic names, you can use KafkaStreams#toString(). > > > > > > - -Matthias > > > > > > > > On 11/8/16 11:14 AM, Eno Thereska wrote: > > > Hi Sachin, > > > > > > One option right now would be to precreate all internal topics in > > > Kafka, and only after that start the Kafka Streams application. > > > This would require you knowing the internal name of the topics (in > > > this case you probably already know it, but I agree that in general > > > this is a bit cumbersome). > > > > > > Eno > > > > > >> On 8 Nov 2016, at 18:10, Sachin Mittal <sjmit...@gmail.com> > > >> wrote: > > >> > > >> Per message payload size. The basic question is how can I control > > >> the internal change log topics parameters so as to avoid these > > >> errors. > > >> > > >> > > >> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna <krishna...@gmail.com> > > >> wrote: > > >> > > >>> Are you talking about total messages and therefore size or per > > >>> message payload size. > > >>> > > >>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal > > >>> <sjmit...@gmail.com> wrote: > > >>> > > >>>> Message size itself increases over the time. > > >>>> > > >>>> Message is something like key=[list on objects] > > >>>> > > >>>> This increases with time and then at a point kafka is not > > >>>> able to add any message to its topic because message size is > > >>>> greater than max.message.bytes. Since this is an internal > > >>>> topic based off a table I don't know how can I control this > > >>>> topic. > > >>>> > > >>>> If I can set some retention.ms for this topic then I can > > >>>> purge old messages thereby ensuring that message size stays > > >>>> within limit. > > >>>> > > >>>> Thanks Sachin > > >>>> > > >>>> > > >>>> > > >>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska > > >>>> <eno.there...@gmail.com> wrote: > > >>>> > > >>>>> Hi Sachin, > > >>>>> > > >>>>> Could you clarify what you mean by "message size > > >>>>> increases"? Are > > >>> messages > > >>>>> going to the changelog topic increasing in size? Or is the > > >>>>> changelog > > >>>> topic > > >>>>> getting full? > > >>>>> > > >>>>> Thanks Eno > > >>>>> > > >>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal > > >>>>>> <sjmit...@gmail.com> wrote: > > >>>>>> > > >>>>>> Hi, We are using aggregation by key on a kstream to > > >>>>>> create a ktable. As I read from > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ > > >>>>> Kafka+Streams%3A+Internal+Data+Management > > >>>>>> it creates an internal changelog topic. > > >>>>>> > > >>>>>> However over the time the streaming application is run > > >>>>>> message size increases and it starts throwing > > >>>>>> max.message.bytes exception. > > >>>>>> > > >>>>>> Is there a way to control the retention.ms time for > > >>>>>> internal > > >>> changelog > > >>>>>> topics so that messages are purged before they exceed > > >>>>>> this size. > > >>>>>> > > >>>>>> If not is there a way to control or avoid such an error. > > >>>>>> > > >>>>>> Thanks Sachin > > >>>>> > > >>>>> > > >>>> > > >>> > > >>> > > >>> > > >>> -- Radha Krishna, Proddaturi 253-234-5657 <(253)%20234-5657> > > >>> > > > > > -----BEGIN PGP SIGNATURE----- > > Comment: GPGTools - https://gpgtools.org > > > > iQIcBAEBCgAGBQJYIjGcAAoJECnhiMLycopPMp4P/3+mEVc8bIunni9nuNUFBWk0 > > S/UvCvgkb7JBqBdVl7IpDsylAB+TwdMOTf+oE13buxF+XScTV04U+DYl1T/4DE/U > > PObXQsKFutY59u6k9AIW7H+aTJPRa+3M8SHf3zEdLsukzFw+F1gJcPbFxkr871Ck > > pw2A3PuSXHe2K2u1t/SI/IuhSSk2K54gxVCbnK/XQqnpp1/JZNHP+ar6jplCM7ix > > 8EOkgLgw/Kh4i0c7yuPbGOZ1wiPtimuWJI/FtKf+i2UiT7LUAzkbNdbXzBFGDoG7 > > xpSgqOhC5pBUqymHQxmSTCJvO3bAlGRg0rWmPfRjmFdcQlR7a/I6po9eVAjWpaMk > > IFlKvplRgY4ubbkbRUWGBVIv5dwl4IT6SJ5FubPZkw1A4147H0SJB09CvdwXY43+ > > 5HjW76lHmYRUtdFl+RTlTxNUy/yfjnIXzLjQqHEnzcIPdnJY2lM6iUj94JPzFMUE > > nY6z68PoXdKZw2VkkkiB7bnyaH1wRFD+AZKQH8ZoH2axYExg+MxJk+Fhcd+E2yU/ > > TL8b6lEcvwHOUU13H0ztSBUIJsjdh8aLVpSTvVtClDGKJJpueNznsbxf4TiVGoOm > > INFNIJFfnZ2c9rOH8AGJHkdIjkJaAB8DbxP4pYoNTPboCjeFFe/B3dBUlxLkWiDq > > Ny16O/mM8+6ydEG8ZzcA > > =+92S > > -----END PGP SIGNATURE----- > > >