-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 There is no build it support for broadcast variables. Loops are also not supported -- the dataflow graph must be a DAG.
You could use an additional topic, plus extra (user instantiated) producer and consumer (using manual topic partition assigment!) with you code to build you own feedback/broadcast loop. - -Matthias On 11/10/16 8:40 PM, Sachin Mittal wrote: > Hi, On the subject of deleting values from list I have following > toplogy aggregate(old list) = new list -> left join (another list) > -> output sink. > > While processing the output list I know which values in the list > are old and can be removed. Is there any way to pass that > information from downstream back to upstream? Any thoughts around > how can I pass this information. > > One thing I can think off is that I can set some global variable in > the output sink. > > So next time aggregate function is run it can lookup the global > variable and remove items from the list. So new list = old list + > new value added - old values removed. > > In spark we have something like broadcast variables to do the > same. > > Is there any such similar concept in kafka streaming. > > This way we can keep the changelog topic message from growing and > prevent the max message bytes exception. > > Thanks Sachin > > > > > > On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax > <matth...@confluent.io> wrote: > > Sachin, > > my commend about deleting was about deleting values from the list, > not about deleting the whole key/value record. > > If you want to delete a whole key/value record it there is not > update for it for some time, you can combine compaction with > retention. You need to alter the configuration of the changelog > topics and set > > cleanup.policy=compact,delete > > Than, retention.ms will be applied to the changelog, too. > > > -Matthias > > On 11/10/16 3:17 AM, Sachin Mittal wrote: >>>> Hi, As per Eno suggestion I have pre-created internal >>>> changelog topics with increased max.message.bytes config to >>>> handle big messages that gets incremented over the time. >>>> >>>> As Matthias has pointed that we cannot use retention.ms >>>> setting to delete older message data after a given time, is >>>> there a way to purge older messages from my changelog topic. >>>> >>>> Remember my changelog topic is key=list of objects and this >>>> grows with time. >>>> >>>> So I would like these to be deleted from time to time because >>>> I would have already consumed the objects so that key/value >>>> can be deleted. Later if I get a new object for the same key >>>> then that's a new message and old data has no use for the >>>> streaming application. >>>> >>>> So how can I achieve the following. Would retention.bytes >>>> help here? >>>> >>>> Is there a way if i can set expire after or something like >>>> that at message level and some kafka thread would purge those >>>> messages. >>>> >>>> Thanks Sachin >>>> >>>> >>>> >>>> On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax >>>> <matth...@confluent.io> wrote: >>>> >>>> My two cents: >>>> >>>> Changelog topics are compacted topics, thus they do not have >>>> a retention time (there is an exception for windowed KTable >>>> changlog topics that are compacted and do have a retention >>>> time though). >>>> >>>> However, I do not understand how changing retention time >>>> should fix the issue. If your list of values grows and >>>> exceed max.message.byte you will need to increase this >>>> parameter (or shrink you value). >>>> >>>> Besides this, Eno's answer is the way to go. In order to >>>> figure out internal topic names, you can use >>>> KafkaStreams#toString(). >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 11/8/16 11:14 AM, Eno Thereska wrote: >>>>>>> Hi Sachin, >>>>>>> >>>>>>> One option right now would be to precreate all >>>>>>> internal topics in Kafka, and only after that start the >>>>>>> Kafka Streams application. This would require you >>>>>>> knowing the internal name of the topics (in this case >>>>>>> you probably already know it, but I agree that in >>>>>>> general this is a bit cumbersome). >>>>>>> >>>>>>> Eno >>>>>>> >>>>>>>> On 8 Nov 2016, at 18:10, Sachin Mittal >>>>>>>> <sjmit...@gmail.com> wrote: >>>>>>>> >>>>>>>> Per message payload size. The basic question is how >>>>>>>> can I control the internal change log topics >>>>>>>> parameters so as to avoid these errors. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna >>>>>>>> <krishna...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Are you talking about total messages and therefore >>>>>>>>> size or per message payload size. >>>>>>>>> >>>>>>>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal >>>>>>>>> <sjmit...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Message size itself increases over the time. >>>>>>>>>> >>>>>>>>>> Message is something like key=[list on objects] >>>>>>>>>> >>>>>>>>>> This increases with time and then at a point >>>>>>>>>> kafka is not able to add any message to its topic >>>>>>>>>> because message size is greater than >>>>>>>>>> max.message.bytes. Since this is an internal >>>>>>>>>> topic based off a table I don't know how can I >>>>>>>>>> control this topic. >>>>>>>>>> >>>>>>>>>> If I can set some retention.ms for this topic >>>>>>>>>> then I can purge old messages thereby ensuring >>>>>>>>>> that message size stays within limit. >>>>>>>>>> >>>>>>>>>> Thanks Sachin >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska >>>>>>>>>> <eno.there...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Sachin, >>>>>>>>>>> >>>>>>>>>>> Could you clarify what you mean by "message >>>>>>>>>>> size increases"? Are >>>>>>>>> messages >>>>>>>>>>> going to the changelog topic increasing in >>>>>>>>>>> size? Or is the changelog >>>>>>>>>> topic >>>>>>>>>>> getting full? >>>>>>>>>>> >>>>>>>>>>> Thanks Eno >>>>>>>>>>> >>>>>>>>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal >>>>>>>>>>>> <sjmit...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, We are using aggregation by key on a >>>>>>>>>>>> kstream to create a ktable. As I read from >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ >>>>>>>>>>> >>>>>>>>>>>> Kafka+Streams%3A+Internal+Data+Management >>>>>>>>>>>> it creates an internal changelog topic. >>>>>>>>>>>> >>>>>>>>>>>> However over the time the streaming >>>>>>>>>>>> application is run message size increases and >>>>>>>>>>>> it starts throwing max.message.bytes >>>>>>>>>>>> exception. >>>>>>>>>>>> >>>>>>>>>>>> Is there a way to control the retention.ms >>>>>>>>>>>> time for internal >>>>>>>>> changelog >>>>>>>>>>>> topics so that messages are purged before >>>>>>>>>>>> they exceed this size. >>>>>>>>>>>> >>>>>>>>>>>> If not is there a way to control or avoid >>>>>>>>>>>> such an error. >>>>>>>>>>>> >>>>>>>>>>>> Thanks Sachin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- Radha Krishna, Proddaturi 253-234-5657 >>>>>>>>> >>>>>>> >>>>> >>>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYJXHzAAoJECnhiMLycopPcl0QAIl8w2jOSi2OXOJWVCD4DdgL bCVxzpYM2kve1GlP464/cIH5ZlVB2m6llH9jfEWqsk/OqTfEwO6zbNbm+2Kfv65K V1OU5iAXf3WyjFMuOWXNZnaydwK3HwvH0OxvTwy6uV0OmPd2SxtdvXkjREIS5kls QEGXhJn9QVtZh3+7Mc9X2ihp0wVVZY3cJr3EBU5kEeNPY0xhGNd57XicR4G7+jxl 2Kz5LBf8s0iC1wnQuciodv6lcTsmUdG/8ZeMGbIENcXXkaVgjUtWRl9TbCKkJOc+ rQkN7MGbX1YbYYzfNSD/TGx8FmFlKlWN9Bit3gwub+vUnEWgCYf2PiI2XSpPZ5DA kNLf0Xg9vKn35vvDlybFEd87ZX0/0srZXtASMIwvUmdN3t4fMrVtFmgpUmxMiqrd A8iUkifsHA/FxHcAYoeVQYlCCWaTkY/s+r/mA51ixYRsejiU32XxjjZtMbfSGejC WtASqGo0grEQiJ+OH2zEwwSOjfyaud+G7TAxeVR2+9Q7hJc07qIDAzfU1/NpcLtO fOqx35jsXu9nnKF9T0wXH86tUn3O9edOZIG3INF8YRQy5gbfqccJ3dgwGPKKLzz/ 3fCivTb/xmi/EkjTHRGqmBL7jVn6MubZqlTFi4cygAw1REjHqtT1l1c2ZirLwJ6Z Cv7Dzi7otvzwfrn4v/+P =nq8b -----END PGP SIGNATURE-----