Seems this question was cross posted on SO:

On 1/14/19 8:49 AM, Jonathan Santilli wrote:
> Hello Bill, thanks a lot for the reply,
> I will implement your recommendation about the
> *KafkaStreams#setGlobalStateRestoreListener.*
> About your question:
> *When you say you have used both "exactly once" and "at least once" for the*
> *"at least once" case did you run for a while in that mode then restart?*
> *Yes, I have done that among other combinations, but the same behaviour.*
> This is what I see in the logs after restart:
> INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
> internals.StoreChangelogReader ( -
> stream-thread [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
> No checkpoint found for task 1_8 state store
> KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8* with EOS turned
> on. Reinitializing the task and restore its state from the beginning.
> INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
> internals.Fetcher ( - [Consumer
> clientId=*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3-restore-consumer,
> groupId=] Resetting offset for partition
> *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8
> to offset 0*.
> Before I restart, I always check the LAG for the consumer group (*APP-ID*)
> reading from the output topic 'outPutTopicNameOfGroupedData' to verify is
> 1. Immediately after the restart and verify the logs above, the LAG for
> that consumer group (*APP-ID*) reading from the output topic '
> outPutTopicNameOfGroupedData' goes up, increasing so much that the App
> reading from 'outPutTopicNameOfGroupedData' topic, is re-processing the
> data again.
> I hope someone can give me some clue, I will really appreciate.
> Cheers!
> --
> Jonathan
> On Mon, Jan 14, 2019 at 4:12 PM Bill Bejeck <> wrote:
>> Hi Jonathan,
>> With EOS enabled, Kafka Streams does not use checkpoint files for restoring
>> state stores; it will replay the data contained in the changelog topic.
>> But this should not affect where the input source topic(s) after a restart
>> also the changelog topics are only consumed from during a restore (or for
>> keeping standby tasks up to date).
>> When you say you have used both "exactly once" and "at least once" for the
>> "at least once" case did you run for a while in that mode then restart? You
>> can confirm how much data and from which offset the streams is restoring a
>> state store by using a custom implementation of the StateRestoreListener
>> class and set it via the KafkaStreams#setGlobalStateRestoreListener.
>> -Bill
>> On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli <
>>> wrote:
>>> I have a Kafka Streams application for which, whenever I restart it, the
>>> offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is
>>> consuming get reset to 0. Hence, for all partitions, the lags increase
>> and
>>> the app needs to reprocess all the data.
>>> I have ensured the lag is 1 for every partition before the restart. All
>>> consumers that belong to that consumer-group-id (app-id) are active. The
>>> restart is immediate, it takes around 30 secs.
>>> The app is using exactly once as processing guarantee.
>>> I have read this answer How does an offset expire for an Apache Kafka
>>> consumer group?
>>> <
>>> .
>>> I have tried with *auto.offset.reset = latest* and *auto.offset.reset =
>>> earliest*.
>>> I assume that after the restart the app should pick-up from the latest
>>> committed offset for that consumer group.
>>> It is possible to know why the offsets are getting reset from 0?
>>> I would really appreciate any clue about this.
>>> This is the code the App execute:
>>> final StreamsBuilder builder = new StreamsBuilder();
>>> final KStream<..., ...> events = builder
>>>         .stream(inputTopicNames, Consumed.with(..., ...)
>>>         .withTimestampExtractor(...);
>>> events
>>>     .filter((k, v) -> ...)
>>>     .flatMapValues(v -> ...)
>>>     .flatMapValues(v -> ...)
>>>     .selectKey((k, v) -> v)
>>>     .groupByKey(Grouped.with(..., ...))
>>>     .windowedBy(
>>>         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
>>>             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
>>>             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
>>>     .reduce((agg, new) -> {
>>>         ...
>>>         return agg;
>>>     })
>>>     .suppress(Suppressed.untilWindowCloses(
>>>                   Suppressed.BufferConfig.unbounded()))
>>>     .toStream()
>>>     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
>>> The offset reset just and always happens (after restarting) with the
>>> *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream
>>> API.
>>> I have tried with the Processing guarantee *exactly once* and *at least
>>> once*.
>>> Once again, I will really appreciate any clue about this.
>>> P.S: I have also posted the question in *SO*:
>>> Cheers!
>>> --
>>> Santilli Jonathan

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to