-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hey,

changelog topics are compacted topics and no retention time is applied
(one exception are window-changelog topics though, which have both --
compaction and retention policy enabled)

If an input message is purged via retention time (and this is you
latest committed offset), and you start you Stream application, it
will resume according to "auto.offset.reset" policy what you can
specify in StreamsConfig. So Streams will just run fine, but the data
is of course lost.

For repartitioning topics that same argument applies.

> I am asking this because I am planning to keep the retention time
> for internal changelog topics also small so no message gets big
> enough to start getting exceptions.

I don't understand this part though...

To set an arbitrary start offset there is no API or tooling available
at the moment. However, we plan to add some of this in future releases.

As for now, you could set start offsets "manually" by writing a small
consumer application, that does not process data, but only seek() to
(and commit()) the start offsets you want to use. This is a similar
idea as the Streams application reset tool is built on. See this blog
post for details:

https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset
ting-a-streams-application/

However, you should be careful to not mess with internally kept state
(ie, make sure it is still semantically meaningful and compatible if
you start to modify offsets...)


Hope this helps.

- -Matthias

On 11/9/16 7:29 AM, Sachin Mittal wrote:
> Hi, What happens when the message itself is purged by kafka via
> retention time setting or something else, which was later than the
> last offset stored by the stream consumer.
> 
> I am asking this because I am planning to keep the retention time
> for internal changelog topics also small so no message gets big
> enough to start getting exceptions.
> 
> So if messages from last offset are deleted then will there be any
> issues?
> 
> Also is there anyway to control or set the offset manually when we
> re start the streaming application so certain old messages are not
> consumed at all as logic wise they are not useful to streaming
> application any more. Like say past users sessions created while
> streaming application was stopped.
> 
> 
> Thanks Sachin
> 
> 
> On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska
> <[email protected]> wrote:
> 
>> Hi Sachin,
>> 
>> Kafka Streams is built on top of standard Kafka consumers. For
>> for every topic it consumes from (whether changelog topic or
>> source topic, it doesn't matter), the consumer stores the offset
>> it last consumed from. Upon restart, by default it start
>> consuming from where it left off from each of the topics. So you
>> can think of it this way: a restart should be no different than
>> if you had left the application running (i.e., no restart).
>> 
>> Thanks Eno
>> 
>> 
>>> On 9 Nov 2016, at 13:59, Sachin Mittal <[email protected]>
>>> wrote:
>>> 
>>> Hi, I had some basic questions on sequence of tasks for
>>> streaming application restart in case of failure or otherwise.
>>> 
>>> Say my stream is structured this way
>>> 
>>> source-topic branched into 2 kstreams source-topic-1 
>>> source-topic-2 each mapped to 2 new kstreams (new key,value
>>> pairs) backed by 2 kafka topics source-topic-1-new 
>>> source-topic-2-new each aggregated to new ktable backed by
>>> internal changelog topics source-topic-1-new-table
>>> (scource-topic-1-new-changelog) source-topic-2-new-table
>>> (scource-topic-2-new-changelog) table1 left join table2 -> to
>>> final stream Results of final stream are then persisted into
>>> another data storage
>>> 
>>> So if you see I have following physical topics or state stores 
>>> source-topic source-topic-1-new source-topic-2-new 
>>> scource-topic-1-new-changelog scource-topic-2-new-changelog
>>> 
>>> Now at a give point if the streaming application is stopped
>>> there is some data in all these topics. Barring the
>>> source-topic all other topic has data inserted by the
>> streaming
>>> application.
>>> 
>>> Also I suppose streaming application stores the offset for each
>>> of the topic as where it was last.
>>> 
>>> So when I restart the application how does the processing
>>> starts again? Will it pick the data from last left changelog
>>> topics and process them first and then process the source topic
>>> data from the offset last left?
>>> 
>>> Or it will start from source topic. I really don't want it to
>>> maintain offset to changelog tables because any old key's value
>>> can be modified as part of aggregation again.
>>> 
>>> Bit confused here, any light would help a lot.
>>> 
>>> Thanks Sachin
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l
+QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr
/T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1
MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC
Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB
GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys
CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m
w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T
485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b
IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/
yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M
udwtipE7kAwoMN0vgQnr
=QX8B
-----END PGP SIGNATURE-----

Reply via email to