Hi Sachin,

You can achieve what you want by setting the correct cleanup.policy on
these topics.
In this case you want cleanup.policy=compact,delete - you'll also want to
set retention.ms and/or retention.bytes.

The topic will then be compacted, but it will also delete any segments
based on the retention settings.

You can look here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
for
further details.

HTH,
Damian

On Thu, 10 Nov 2016 at 11:17 Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> As per Eno suggestion I have pre-created internal changelog topics with
> increased max.message.bytes config to handle big messages that gets
> incremented over the time.
>
> As Matthias has pointed that we cannot use retention.ms setting to delete
> older message data after a given time, is there a way to purge older
> messages from my changelog topic.
>
> Remember my changelog topic is
> key=list of objects
> and this grows with time.
>
> So I would like these to be deleted from time to time because I would have
> already consumed the objects so that key/value can be deleted.
> Later if I get a new object for the same key then that's a new message and
> old data has no use for the streaming application.
>
> So how can I achieve the following.
> Would retention.bytes help here?
>
> Is there a way if i can set expire after or something like that at message
> level and some kafka thread would purge those messages.
>
> Thanks
> Sachin
>
>
>
> On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > My two cents:
> >
> > Changelog topics are compacted topics, thus they do not have a
> > retention time (there is an exception for windowed KTable changlog
> > topics that are compacted and do have a retention time though).
> >
> > However, I do not understand how changing retention time should fix
> > the issue. If your list of values grows and exceed max.message.byte
> > you will need to increase this parameter (or shrink you value).
> >
> > Besides this, Eno's answer is the way to go. In order to figure out
> > internal topic names, you can use KafkaStreams#toString().
> >
> >
> > - -Matthias
> >
> >
> >
> > On 11/8/16 11:14 AM, Eno Thereska wrote:
> > > Hi Sachin,
> > >
> > > One option right now would be to precreate all internal topics in
> > > Kafka, and only after that start the Kafka Streams application.
> > > This would require you knowing the internal name of the topics (in
> > > this case you probably already know it, but I agree that in general
> > > this is a bit cumbersome).
> > >
> > > Eno
> > >
> > >> On 8 Nov 2016, at 18:10, Sachin Mittal <sjmit...@gmail.com>
> > >> wrote:
> > >>
> > >> Per message payload size. The basic question is how can I control
> > >> the internal change log topics parameters so as to avoid these
> > >> errors.
> > >>
> > >>
> > >> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna <krishna...@gmail.com>
> > >> wrote:
> > >>
> > >>> Are you talking about total messages and therefore size or per
> > >>> message payload size.
> > >>>
> > >>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal
> > >>> <sjmit...@gmail.com> wrote:
> > >>>
> > >>>> Message size itself increases over the time.
> > >>>>
> > >>>> Message is something like key=[list on objects]
> > >>>>
> > >>>> This increases with time and then at a point kafka is not
> > >>>> able to add any message to its topic because message size is
> > >>>> greater than max.message.bytes. Since this is an internal
> > >>>> topic based off a table I don't know how can I control this
> > >>>> topic.
> > >>>>
> > >>>> If I can set some retention.ms for this topic then I can
> > >>>> purge old messages thereby ensuring that message size stays
> > >>>> within limit.
> > >>>>
> > >>>> Thanks Sachin
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska
> > >>>> <eno.there...@gmail.com> wrote:
> > >>>>
> > >>>>> Hi Sachin,
> > >>>>>
> > >>>>> Could you clarify what you mean by "message size
> > >>>>> increases"? Are
> > >>> messages
> > >>>>> going to the changelog topic increasing in size? Or is the
> > >>>>> changelog
> > >>>> topic
> > >>>>> getting full?
> > >>>>>
> > >>>>> Thanks Eno
> > >>>>>
> > >>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal
> > >>>>>> <sjmit...@gmail.com> wrote:
> > >>>>>>
> > >>>>>> Hi, We are using aggregation by key on a kstream to
> > >>>>>> create a ktable. As I read from
> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> > >>>>> Kafka+Streams%3A+Internal+Data+Management
> > >>>>>> it creates an internal changelog topic.
> > >>>>>>
> > >>>>>> However over the time the streaming application is run
> > >>>>>> message size increases and it starts throwing
> > >>>>>> max.message.bytes exception.
> > >>>>>>
> > >>>>>> Is there a way to control the retention.ms time for
> > >>>>>> internal
> > >>> changelog
> > >>>>>> topics so that messages are purged before they exceed
> > >>>>>> this size.
> > >>>>>>
> > >>>>>> If not is there a way to control or avoid such an error.
> > >>>>>>
> > >>>>>> Thanks Sachin
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> -- Radha Krishna, Proddaturi 253-234-5657 <(253)%20234-5657>
> > >>>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJYIjGcAAoJECnhiMLycopPMp4P/3+mEVc8bIunni9nuNUFBWk0
> > S/UvCvgkb7JBqBdVl7IpDsylAB+TwdMOTf+oE13buxF+XScTV04U+DYl1T/4DE/U
> > PObXQsKFutY59u6k9AIW7H+aTJPRa+3M8SHf3zEdLsukzFw+F1gJcPbFxkr871Ck
> > pw2A3PuSXHe2K2u1t/SI/IuhSSk2K54gxVCbnK/XQqnpp1/JZNHP+ar6jplCM7ix
> > 8EOkgLgw/Kh4i0c7yuPbGOZ1wiPtimuWJI/FtKf+i2UiT7LUAzkbNdbXzBFGDoG7
> > xpSgqOhC5pBUqymHQxmSTCJvO3bAlGRg0rWmPfRjmFdcQlR7a/I6po9eVAjWpaMk
> > IFlKvplRgY4ubbkbRUWGBVIv5dwl4IT6SJ5FubPZkw1A4147H0SJB09CvdwXY43+
> > 5HjW76lHmYRUtdFl+RTlTxNUy/yfjnIXzLjQqHEnzcIPdnJY2lM6iUj94JPzFMUE
> > nY6z68PoXdKZw2VkkkiB7bnyaH1wRFD+AZKQH8ZoH2axYExg+MxJk+Fhcd+E2yU/
> > TL8b6lEcvwHOUU13H0ztSBUIJsjdh8aLVpSTvVtClDGKJJpueNznsbxf4TiVGoOm
> > INFNIJFfnZ2c9rOH8AGJHkdIjkJaAB8DbxP4pYoNTPboCjeFFe/B3dBUlxLkWiDq
> > Ny16O/mM8+6ydEG8ZzcA
> > =+92S
> > -----END PGP SIGNATURE-----
> >
>

Reply via email to