That is correct On Thu, 10 Nov 2016 at 11:50 Sachin Mittal <[email protected]> wrote:
> Hi, > The reset tool looks like a great feature. > > So following this link > > https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/ > > What I understand is that this tool resets the offsets for internal and > intermediate topics and also deletes all the internal local storage and > topics. > Please confirm this. > > I was actually doing all this manually. > > Thanks > Sachin > > > On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax <[email protected]> > wrote: > > > -----BEGIN PGP SIGNED MESSAGE----- > > Hash: SHA512 > > > > Hey, > > > > changelog topics are compacted topics and no retention time is applied > > (one exception are window-changelog topics though, which have both -- > > compaction and retention policy enabled) > > > > If an input message is purged via retention time (and this is you > > latest committed offset), and you start you Stream application, it > > will resume according to "auto.offset.reset" policy what you can > > specify in StreamsConfig. So Streams will just run fine, but the data > > is of course lost. > > > > For repartitioning topics that same argument applies. > > > > > I am asking this because I am planning to keep the retention time > > > for internal changelog topics also small so no message gets big > > > enough to start getting exceptions. > > > > I don't understand this part though... > > > > To set an arbitrary start offset there is no API or tooling available > > at the moment. However, we plan to add some of this in future releases. > > > > As for now, you could set start offsets "manually" by writing a small > > consumer application, that does not process data, but only seek() to > > (and commit()) the start offsets you want to use. This is a similar > > idea as the Streams application reset tool is built on. See this blog > > post for details: > > > > https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset > > ting-a-streams-application/ > > > > However, you should be careful to not mess with internally kept state > > (ie, make sure it is still semantically meaningful and compatible if > > you start to modify offsets...) > > > > > > Hope this helps. > > > > - -Matthias > > > > On 11/9/16 7:29 AM, Sachin Mittal wrote: > > > Hi, What happens when the message itself is purged by kafka via > > > retention time setting or something else, which was later than the > > > last offset stored by the stream consumer. > > > > > > I am asking this because I am planning to keep the retention time > > > for internal changelog topics also small so no message gets big > > > enough to start getting exceptions. > > > > > > So if messages from last offset are deleted then will there be any > > > issues? > > > > > > Also is there anyway to control or set the offset manually when we > > > re start the streaming application so certain old messages are not > > > consumed at all as logic wise they are not useful to streaming > > > application any more. Like say past users sessions created while > > > streaming application was stopped. > > > > > > > > > Thanks Sachin > > > > > > > > > On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska > > > <[email protected]> wrote: > > > > > >> Hi Sachin, > > >> > > >> Kafka Streams is built on top of standard Kafka consumers. For > > >> for every topic it consumes from (whether changelog topic or > > >> source topic, it doesn't matter), the consumer stores the offset > > >> it last consumed from. Upon restart, by default it start > > >> consuming from where it left off from each of the topics. So you > > >> can think of it this way: a restart should be no different than > > >> if you had left the application running (i.e., no restart). > > >> > > >> Thanks Eno > > >> > > >> > > >>> On 9 Nov 2016, at 13:59, Sachin Mittal <[email protected]> > > >>> wrote: > > >>> > > >>> Hi, I had some basic questions on sequence of tasks for > > >>> streaming application restart in case of failure or otherwise. > > >>> > > >>> Say my stream is structured this way > > >>> > > >>> source-topic branched into 2 kstreams source-topic-1 > > >>> source-topic-2 each mapped to 2 new kstreams (new key,value > > >>> pairs) backed by 2 kafka topics source-topic-1-new > > >>> source-topic-2-new each aggregated to new ktable backed by > > >>> internal changelog topics source-topic-1-new-table > > >>> (scource-topic-1-new-changelog) source-topic-2-new-table > > >>> (scource-topic-2-new-changelog) table1 left join table2 -> to > > >>> final stream Results of final stream are then persisted into > > >>> another data storage > > >>> > > >>> So if you see I have following physical topics or state stores > > >>> source-topic source-topic-1-new source-topic-2-new > > >>> scource-topic-1-new-changelog scource-topic-2-new-changelog > > >>> > > >>> Now at a give point if the streaming application is stopped > > >>> there is some data in all these topics. Barring the > > >>> source-topic all other topic has data inserted by the > > >> streaming > > >>> application. > > >>> > > >>> Also I suppose streaming application stores the offset for each > > >>> of the topic as where it was last. > > >>> > > >>> So when I restart the application how does the processing > > >>> starts again? Will it pick the data from last left changelog > > >>> topics and process them first and then process the source topic > > >>> data from the offset last left? > > >>> > > >>> Or it will start from source topic. I really don't want it to > > >>> maintain offset to changelog tables because any old key's value > > >>> can be modified as part of aggregation again. > > >>> > > >>> Bit confused here, any light would help a lot. > > >>> > > >>> Thanks Sachin > > >> > > >> > > > > > -----BEGIN PGP SIGNATURE----- > > Comment: GPGTools - https://gpgtools.org > > > > iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l > > +QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr > > /T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1 > > MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC > > Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB > > GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys > > CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m > > w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T > > 485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b > > IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/ > > yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M > > udwtipE7kAwoMN0vgQnr > > =QX8B > > -----END PGP SIGNATURE----- > > >
