Seems this question was cross posted on SO: https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
On 1/14/19 8:49 AM, Jonathan Santilli wrote: > Hello Bill, thanks a lot for the reply, > I will implement your recommendation about the > *KafkaStreams#setGlobalStateRestoreListener.* > > About your question: > > *When you say you have used both "exactly once" and "at least once" for the* > *"at least once" case did you run for a while in that mode then restart?* > > *Yes, I have done that among other combinations, but the same behaviour.* > > This is what I see in the logs after restart: > > INFO [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3] > internals.StoreChangelogReader (StoreChangelogReader.java:215) - > stream-thread [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3] > No checkpoint found for task 1_8 state store > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog > *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8* with EOS turned > on. Reinitializing the task and restore its state from the beginning. > > INFO [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3] > internals.Fetcher (Fetcher.java:583) - [Consumer > clientId=*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3-restore-consumer, > groupId=] Resetting offset for partition > *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8 > to offset 0*. > > > Before I restart, I always check the LAG for the consumer group (*APP-ID*) > reading from the output topic 'outPutTopicNameOfGroupedData' to verify is > 1. Immediately after the restart and verify the logs above, the LAG for > that consumer group (*APP-ID*) reading from the output topic ' > outPutTopicNameOfGroupedData' goes up, increasing so much that the App > reading from 'outPutTopicNameOfGroupedData' topic, is re-processing the > data again. > > I hope someone can give me some clue, I will really appreciate. > > > Cheers! > -- > Jonathan > > > On Mon, Jan 14, 2019 at 4:12 PM Bill Bejeck <b...@confluent.io> wrote: > >> Hi Jonathan, >> >> With EOS enabled, Kafka Streams does not use checkpoint files for restoring >> state stores; it will replay the data contained in the changelog topic. >> But this should not affect where the input source topic(s) after a restart >> also the changelog topics are only consumed from during a restore (or for >> keeping standby tasks up to date). >> >> When you say you have used both "exactly once" and "at least once" for the >> "at least once" case did you run for a while in that mode then restart? You >> can confirm how much data and from which offset the streams is restoring a >> state store by using a custom implementation of the StateRestoreListener >> class and set it via the KafkaStreams#setGlobalStateRestoreListener. >> >> -Bill >> >> >> On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli < >> jonathansanti...@gmail.com> wrote: >> >>> I have a Kafka Streams application for which, whenever I restart it, the >>> offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is >>> consuming get reset to 0. Hence, for all partitions, the lags increase >> and >>> the app needs to reprocess all the data. >>> >>> I have ensured the lag is 1 for every partition before the restart. All >>> consumers that belong to that consumer-group-id (app-id) are active. The >>> restart is immediate, it takes around 30 secs. >>> >>> The app is using exactly once as processing guarantee. >>> >>> I have read this answer How does an offset expire for an Apache Kafka >>> consumer group? >>> < >>> >> https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group >>>> >>> . >>> >>> I have tried with *auto.offset.reset = latest* and *auto.offset.reset = >>> earliest*. >>> >>> I assume that after the restart the app should pick-up from the latest >>> committed offset for that consumer group. >>> >>> It is possible to know why the offsets are getting reset from 0? >>> >>> I would really appreciate any clue about this. >>> >>> This is the code the App execute: >>> >>> final StreamsBuilder builder = new StreamsBuilder(); >>> final KStream<..., ...> events = builder >>> .stream(inputTopicNames, Consumed.with(..., ...) >>> .withTimestampExtractor(...); >>> >>> events >>> .filter((k, v) -> ...) >>> .flatMapValues(v -> ...) >>> .flatMapValues(v -> ...) >>> .selectKey((k, v) -> v) >>> .groupByKey(Grouped.with(..., ...)) >>> .windowedBy( >>> TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) >>> .advanceBy(Duration.ofSeconds(windowSizeInSecs)) >>> .grace(Duration.ofSeconds(windowSizeGraceInSecs))) >>> .reduce((agg, new) -> { >>> ... >>> return agg; >>> }) >>> .suppress(Suppressed.untilWindowCloses( >>> Suppressed.BufferConfig.unbounded())) >>> .toStream() >>> .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); >>> >>> The offset reset just and always happens (after restarting) with the >>> *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream >>> API. >>> >>> I have tried with the Processing guarantee *exactly once* and *at least >>> once*. >>> >>> Once again, I will really appreciate any clue about this. >>> P.S: I have also posted the question in *SO*: >>> >>> >> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio >>> >>> >>> Cheers! >>> -- >>> Santilli Jonathan >>> >> > >
signature.asc
Description: OpenPGP digital signature