-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

The reset tool does not have any parameter. But if you copy and modify
the code, it would be quite simple to change this behavior.

You can also always write an own tool from scratch to modify committed
offsets -- just use a consumer with the corresponding group.id, seek()
to the offsets you are interested in, and commit() them.

Afterward, when you startup the Streams application, it will pick up
those committed offsets and resume from there. If some data gets
written after your commit and before you start the application, those
data will be processed of course. Thus, if you really want to be at
"end of log" at startup, you would need to commit an invalid offset
(like Long.MAX_VALUE) and specify auto.offset.reset=latest in your
StreamsConfig.


- -Matthias


On 11/10/16 6:37 PM, Sachin Mittal wrote:
> In the 
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-res
etting-a-streams-application/
>
> 
I see following:
> seek to offset zero for all partitions of all input topics and
> commit the offset
> 
> also I see: the offsets for kept intermediate topics must be set to
> the largest value (i.e., to the current log-size) instead of zero.
> 
> Can we do the same for input topic too via this reset tool or some
> manual command. So we set it to largest value and when we restart
> we processes the latest messages from input topic too. As of now we
> are OK with some data getting lost while the streaming application
> is not running.
> 
> Thanks Sachin
> 
> 
> 
> On Fri, Nov 11, 2016 at 12:53 AM, Matthias J. Sax
> <[email protected]> wrote:
> 
> The tool resets all offsets of user input and intermediate topics
> as well as for all internal intermediate and topics.
> 
> Furthermore, internal topics (intermediate and store changelogs)
> get deleted. But NOT local stores -- for local stores you need to
> leverage KafkaStreams#cleanUp()
> 
> See also: 
> http://docs.confluent.io/current/streams/developer-guide.html#applicat
io
>
> 
n-reset-tool
> 
> 
> -Matthias
> 
> On 11/10/16 3:48 AM, Sachin Mittal wrote:
>>>> Hi, The reset tool looks like a great feature.
>>>> 
>>>> So following this link 
>>>> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-
res
>
>>>> 
etting-a-streams-application/
>>>> 
>>>> What I understand is that this tool resets the offsets for 
>>>> internal and intermediate topics and also deletes all the
>>>> internal local storage and topics. Please confirm this.
>>>> 
>>>> I was actually doing all this manually.
>>>> 
>>>> Thanks Sachin
>>>> 
>>>> 
>>>> On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax 
>>>> <[email protected]> wrote:
>>>> 
>>>> Hey,
>>>> 
>>>> changelog topics are compacted topics and no retention time
>>>> is applied (one exception are window-changelog topics though,
>>>> which have both -- compaction and retention policy enabled)
>>>> 
>>>> If an input message is purged via retention time (and this is
>>>> you latest committed offset), and you start you Stream
>>>> application, it will resume according to "auto.offset.reset"
>>>> policy what you can specify in StreamsConfig. So Streams will
>>>> just run fine, but the data is of course lost.
>>>> 
>>>> For repartitioning topics that same argument applies.
>>>> 
>>>>>>> I am asking this because I am planning to keep the
>>>>>>> retention time for internal changelog topics also small
>>>>>>> so no message gets big enough to start getting
>>>>>>> exceptions.
>>>> 
>>>> I don't understand this part though...
>>>> 
>>>> To set an arbitrary start offset there is no API or tooling 
>>>> available at the moment. However, we plan to add some of this
>>>> in future releases.
>>>> 
>>>> As for now, you could set start offsets "manually" by writing
>>>> a small consumer application, that does not process data, but
>>>> only seek() to (and commit()) the start offsets you want to
>>>> use. This is a similar idea as the Streams application reset
>>>> tool is built on. See this blog post for details:
>>>> 
>>>> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-
res
>
>>>> 
et
>>>> 
>>>> 
> ting-a-streams-application/
>>>> 
>>>> However, you should be careful to not mess with internally
>>>> kept state (ie, make sure it is still semantically meaningful
>>>> and compatible if you start to modify offsets...)
>>>> 
>>>> 
>>>> Hope this helps.
>>>> 
>>>> -Matthias
>>>> 
>>>> On 11/9/16 7:29 AM, Sachin Mittal wrote:
>>>>>>> Hi, What happens when the message itself is purged by
>>>>>>> kafka via retention time setting or something else,
>>>>>>> which was later than the last offset stored by the
>>>>>>> stream consumer.
>>>>>>> 
>>>>>>> I am asking this because I am planning to keep the
>>>>>>> retention time for internal changelog topics also small
>>>>>>> so no message gets big enough to start getting
>>>>>>> exceptions.
>>>>>>> 
>>>>>>> So if messages from last offset are deleted then will
>>>>>>> there be any issues?
>>>>>>> 
>>>>>>> Also is there anyway to control or set the offset
>>>>>>> manually when we re start the streaming application so
>>>>>>> certain old messages are not consumed at all as logic
>>>>>>> wise they are not useful to streaming application any
>>>>>>> more. Like say past users sessions created while
>>>>>>> streaming application was stopped.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks Sachin
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska 
>>>>>>> <[email protected]> wrote:
>>>>>>> 
>>>>>>>> Hi Sachin,
>>>>>>>> 
>>>>>>>> Kafka Streams is built on top of standard Kafka
>>>>>>>> consumers. For for every topic it consumes from
>>>>>>>> (whether changelog topic or source topic, it doesn't
>>>>>>>> matter), the consumer stores the offset it last
>>>>>>>> consumed from. Upon restart, by default it start
>>>>>>>> consuming from where it left off from each of the
>>>>>>>> topics. So you can think of it this way: a restart 
>>>>>>>> should be no different than if you had left the
>>>>>>>> application running (i.e., no restart).
>>>>>>>> 
>>>>>>>> Thanks Eno
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On 9 Nov 2016, at 13:59, Sachin Mittal 
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Hi, I had some basic questions on sequence of tasks
>>>>>>>>> for streaming application restart in case of
>>>>>>>>> failure or otherwise.
>>>>>>>>> 
>>>>>>>>> Say my stream is structured this way
>>>>>>>>> 
>>>>>>>>> source-topic branched into 2 kstreams
>>>>>>>>> source-topic-1 source-topic-2 each mapped to 2 new
>>>>>>>>> kstreams (new key,value pairs) backed by 2 kafka
>>>>>>>>> topics source-topic-1-new source-topic-2-new each
>>>>>>>>> aggregated to new ktable backed by internal
>>>>>>>>> changelog topics source-topic-1-new-table
>>>>>>>>> (scource-topic-1-new-changelog) 
>>>>>>>>> source-topic-2-new-table
>>>>>>>>> (scource-topic-2-new-changelog) table1 left join
>>>>>>>>> table2 -> to final stream Results of final stream
>>>>>>>>> are then persisted into another data storage
>>>>>>>>> 
>>>>>>>>> So if you see I have following physical topics or
>>>>>>>>> state stores source-topic source-topic-1-new 
>>>>>>>>> source-topic-2-new scource-topic-1-new-changelog 
>>>>>>>>> scource-topic-2-new-changelog
>>>>>>>>> 
>>>>>>>>> Now at a give point if the streaming application
>>>>>>>>> is stopped there is some data in all these topics.
>>>>>>>>> Barring the source-topic all other topic has data
>>>>>>>>> inserted by the
>>>>>>>> streaming
>>>>>>>>> application.
>>>>>>>>> 
>>>>>>>>> Also I suppose streaming application stores the
>>>>>>>>> offset for each of the topic as where it was last.
>>>>>>>>> 
>>>>>>>>> So when I restart the application how does the 
>>>>>>>>> processing starts again? Will it pick the data from
>>>>>>>>> last left changelog topics and process them first
>>>>>>>>> and then process the source topic data from the
>>>>>>>>> offset last left?
>>>>>>>>> 
>>>>>>>>> Or it will start from source topic. I really don't
>>>>>>>>> want it to maintain offset to changelog tables
>>>>>>>>> because any old key's value can be modified as part
>>>>>>>>> of aggregation again.
>>>>>>>>> 
>>>>>>>>> Bit confused here, any light would help a lot.
>>>>>>>>> 
>>>>>>>>> Thanks Sachin
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJXBdAAoJECnhiMLycopPdWkQAJhVGp53K3uiSK1RTg4ZaCv3
FqmC5oR4bujZJtPlltQGkZjvSCZ0alnDeepN2xssAH036tOiYm64ordmH5nxfmHt
X1NbyLN77HD+EVbwiE8WQDieV9XIJ8yUvmwv0RX8JOLus/6/vVNtlpwilZfiKSwK
qIJ5vPQ+LBOKX6rO1R8Vw2I1nDooQhlOQbtR3HD9oIY4ySkwEd1i/mhdXCJVCH9D
M2cWtbJzyFlMD4cLVmCsom+6vqxb7T0HaZtOTarwLkDEeQg473Fqifn/fs08MMgw
MxmNgPzKjqLDCjDXJgY0h1frA9zv0gfJoK7RA2HY9DPC2DokbgmLhye2Kbz0GZZ7
DYc1U4YT5EbtQXCPb4DYYUzfcS7ONjzBz62xVsJ0edE2ZbeXVIz+VWpqW3pSrBlG
rS0kM+SIhOdBZbnt3P+Wg7OxRESBMmkaXiewvfkT8fsusUfhSetkM2TWOzlNa+cc
AN0PDRpWMYjxEasDR61C9DLdQ7C8r00jBKRpYAmiVFNDAi+m/B8TS8vr8/MjXl/v
niP872HBu69eofsgcNJVBGc1oJhzdSZBm2+efxVSO4rHT1T6Nh9NFI0pun3MQKgX
ovxzL5DKOJ5/alZ66o49P7KXa/C7R2oeW+8i/oLusbIs+J7fjJrXSYS6pAYOmlp8
bL9r1hwujlgrTleWY2h9
=4nBr
-----END PGP SIGNATURE-----

Reply via email to