-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

There is no build it support for broadcast variables. Loops are also
not supported -- the dataflow graph must be a DAG.

You could use an additional topic, plus extra (user instantiated)
producer and consumer (using manual topic partition assigment!) with
you code to build you own feedback/broadcast loop.


- -Matthias


On 11/10/16 8:40 PM, Sachin Mittal wrote:
> Hi, On the subject of deleting values from list I have following
> toplogy aggregate(old list) = new list -> left join (another list)
> -> output sink.
> 
> While processing the output list I know which values in the list
> are old and can be removed. Is there any way to pass that
> information from downstream back to upstream? Any thoughts around
> how can I pass this information.
> 
> One thing I can think off is that I can set some global variable in
> the output sink.
> 
> So next time aggregate function is run it can lookup the global
> variable and remove items from the list. So new list = old list +
> new value added - old values removed.
> 
> In spark we have something like broadcast variables to do the
> same.
> 
> Is there any such similar concept in kafka streaming.
> 
> This way we can keep the changelog topic message from growing and
> prevent the max message bytes exception.
> 
> Thanks Sachin
> 
> 
> 
> 
> 
> On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> Sachin,
> 
> my commend about deleting was about deleting values from the list,
> not about deleting the whole key/value record.
> 
> If you want to delete a whole key/value record it there is not
> update for it for some time, you can combine compaction with
> retention. You need to alter the configuration of the changelog
> topics and set
> 
> cleanup.policy=compact,delete
> 
> Than, retention.ms will be applied to the changelog, too.
> 
> 
> -Matthias
> 
> On 11/10/16 3:17 AM, Sachin Mittal wrote:
>>>> Hi, As per Eno suggestion I have pre-created internal
>>>> changelog topics with increased max.message.bytes config to
>>>> handle big messages that gets incremented over the time.
>>>> 
>>>> As Matthias has pointed that we cannot use retention.ms
>>>> setting to delete older message data after a given time, is
>>>> there a way to purge older messages from my changelog topic.
>>>> 
>>>> Remember my changelog topic is key=list of objects and this
>>>> grows with time.
>>>> 
>>>> So I would like these to be deleted from time to time because
>>>> I would have already consumed the objects so that key/value
>>>> can be deleted. Later if I get a new object for the same key
>>>> then that's a new message and old data has no use for the
>>>> streaming application.
>>>> 
>>>> So how can I achieve the following. Would retention.bytes
>>>> help here?
>>>> 
>>>> Is there a way if i can set expire after or something like
>>>> that at message level and some kafka thread would purge those
>>>> messages.
>>>> 
>>>> Thanks Sachin
>>>> 
>>>> 
>>>> 
>>>> On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax 
>>>> <matth...@confluent.io> wrote:
>>>> 
>>>> My two cents:
>>>> 
>>>> Changelog topics are compacted topics, thus they do not have
>>>> a retention time (there is an exception for windowed KTable
>>>> changlog topics that are compacted and do have a retention
>>>> time though).
>>>> 
>>>> However, I do not understand how changing retention time
>>>> should fix the issue. If your list of values grows and
>>>> exceed max.message.byte you will need to increase this
>>>> parameter (or shrink you value).
>>>> 
>>>> Besides this, Eno's answer is the way to go. In order to
>>>> figure out internal topic names, you can use
>>>> KafkaStreams#toString().
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> 
>>>> On 11/8/16 11:14 AM, Eno Thereska wrote:
>>>>>>> Hi Sachin,
>>>>>>> 
>>>>>>> One option right now would be to precreate all
>>>>>>> internal topics in Kafka, and only after that start the
>>>>>>> Kafka Streams application. This would require you
>>>>>>> knowing the internal name of the topics (in this case
>>>>>>> you probably already know it, but I agree that in
>>>>>>> general this is a bit cumbersome).
>>>>>>> 
>>>>>>> Eno
>>>>>>> 
>>>>>>>> On 8 Nov 2016, at 18:10, Sachin Mittal 
>>>>>>>> <sjmit...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Per message payload size. The basic question is how
>>>>>>>> can I control the internal change log topics
>>>>>>>> parameters so as to avoid these errors.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Nov 8, 2016 at 11:37 PM, R Krishna 
>>>>>>>> <krishna...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Are you talking about total messages and therefore
>>>>>>>>> size or per message payload size.
>>>>>>>>> 
>>>>>>>>> On Tue, Nov 8, 2016 at 10:00 AM, Sachin Mittal 
>>>>>>>>> <sjmit...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Message size itself increases over the time.
>>>>>>>>>> 
>>>>>>>>>> Message is something like key=[list on objects]
>>>>>>>>>> 
>>>>>>>>>> This increases with time and then at a point
>>>>>>>>>> kafka is not able to add any message to its topic
>>>>>>>>>> because message size is greater than
>>>>>>>>>> max.message.bytes. Since this is an internal
>>>>>>>>>> topic based off a table I don't know how can I
>>>>>>>>>> control this topic.
>>>>>>>>>> 
>>>>>>>>>> If I can set some retention.ms for this topic
>>>>>>>>>> then I can purge old messages thereby ensuring
>>>>>>>>>> that message size stays within limit.
>>>>>>>>>> 
>>>>>>>>>> Thanks Sachin
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Nov 8, 2016 at 11:22 PM, Eno Thereska 
>>>>>>>>>> <eno.there...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Sachin,
>>>>>>>>>>> 
>>>>>>>>>>> Could you clarify what you mean by "message
>>>>>>>>>>> size increases"? Are
>>>>>>>>> messages
>>>>>>>>>>> going to the changelog topic increasing in
>>>>>>>>>>> size? Or is the changelog
>>>>>>>>>> topic
>>>>>>>>>>> getting full?
>>>>>>>>>>> 
>>>>>>>>>>> Thanks Eno
>>>>>>>>>>> 
>>>>>>>>>>>> On 8 Nov 2016, at 16:49, Sachin Mittal 
>>>>>>>>>>>> <sjmit...@gmail.com> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi, We are using aggregation by key on a
>>>>>>>>>>>> kstream to create a ktable. As I read from 
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>>>>>>
>>>>>>>>>>>> 
Kafka+Streams%3A+Internal+Data+Management
>>>>>>>>>>>> it creates an internal changelog topic.
>>>>>>>>>>>> 
>>>>>>>>>>>> However over the time the streaming
>>>>>>>>>>>> application is run message size increases and
>>>>>>>>>>>> it starts throwing max.message.bytes
>>>>>>>>>>>> exception.
>>>>>>>>>>>> 
>>>>>>>>>>>> Is there a way to control the retention.ms
>>>>>>>>>>>> time for internal
>>>>>>>>> changelog
>>>>>>>>>>>> topics so that messages are purged before
>>>>>>>>>>>> they exceed this size.
>>>>>>>>>>>> 
>>>>>>>>>>>> If not is there a way to control or avoid
>>>>>>>>>>>> such an error.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks Sachin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -- Radha Krishna, Proddaturi 253-234-5657
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJXHzAAoJECnhiMLycopPcl0QAIl8w2jOSi2OXOJWVCD4DdgL
bCVxzpYM2kve1GlP464/cIH5ZlVB2m6llH9jfEWqsk/OqTfEwO6zbNbm+2Kfv65K
V1OU5iAXf3WyjFMuOWXNZnaydwK3HwvH0OxvTwy6uV0OmPd2SxtdvXkjREIS5kls
QEGXhJn9QVtZh3+7Mc9X2ihp0wVVZY3cJr3EBU5kEeNPY0xhGNd57XicR4G7+jxl
2Kz5LBf8s0iC1wnQuciodv6lcTsmUdG/8ZeMGbIENcXXkaVgjUtWRl9TbCKkJOc+
rQkN7MGbX1YbYYzfNSD/TGx8FmFlKlWN9Bit3gwub+vUnEWgCYf2PiI2XSpPZ5DA
kNLf0Xg9vKn35vvDlybFEd87ZX0/0srZXtASMIwvUmdN3t4fMrVtFmgpUmxMiqrd
A8iUkifsHA/FxHcAYoeVQYlCCWaTkY/s+r/mA51ixYRsejiU32XxjjZtMbfSGejC
WtASqGo0grEQiJ+OH2zEwwSOjfyaud+G7TAxeVR2+9Q7hJc07qIDAzfU1/NpcLtO
fOqx35jsXu9nnKF9T0wXH86tUn3O9edOZIG3INF8YRQy5gbfqccJ3dgwGPKKLzz/
3fCivTb/xmi/EkjTHRGqmBL7jVn6MubZqlTFi4cygAw1REjHqtT1l1c2ZirLwJ6Z
Cv7Dzi7otvzwfrn4v/+P
=nq8b
-----END PGP SIGNATURE-----

Reply via email to