Hi Bruno We have an environment variable that, when set, will call KafkaStreams.cleanup() and sleep. The changelog topic is an internal KafkaStreams topic, for which I'm not changing any policies. It should be some default policy for a KTable in my understanding. Thanks Murilo
On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna <br...@confluent.io.invalid> wrote: > Hi Murilo, > > A couple of questions: > > 1. What do you mean exactly with clean up? > 2. Do you have acleanup policy specified on the changelog topics? > > Best, > Bruno > > On 15.03.21 15:03, Murilo Tavares wrote: > > Hi Bruno > > No, I haven't tested resetting the application before upgrading on my > large > > environment. But I was able to reproduce it in my dev environment, which > is > > way smaller. > > This is what I did: > > - Clean up and downgrade to 2.4. > > - Let it catch up; > > - upgrade to 2.7; Same errors, but it caught up after a while; > > > > Then I tried these steps: > > - Clean up and downgrade to 2.4. > > - Let it catch up; > > - Clean up and upgrade to 2.7. No error this time. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna <br...@confluent.io.invalid> > > wrote: > > > >> Hi Murilo, > >> > >> Did you retry to upgrade again after you reset the application? Did it > >> work? > >> > >> Best, > >> Bruno > >> > >> On 15.03.21 14:26, Murilo Tavares wrote: > >>> Hi Bruno > >>> Thanks for your response. > >>> No, I did not reset the application prior to upgrading. That was simply > >>> upgrading KafkaStreams from 2.4 to 2.7. > >>> > >>> I was able to reproduce it on a smaller environment, and it does indeed > >>> recover. > >>> In a large environment, though, it keeps like that for hours. In this > >> same > >>> large environment, I had to downgrade the application, and when doing > >> that > >>> I did reset the application, which just took a few minutes. > >>> > >>> Thanks > >>> Murilo > >>> > >>> On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna <br...@confluent.io.invalid > > > >>> wrote: > >>> > >>>> Hi Murilo, > >>>> > >>>> No, you do not need any special procedure to upgrade from 2.4 to 2.7. > >>>> > >>>> What you see in the logs is not an error but a warning. It should not > >>>> block you on startup forever. The warning says that the local states > of > >>>> task 7_17 are corrupted because the offset you want to fetch of the > >>>> state changelog topic partition > >>>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is > larger > >>>> or smaller than the offsets that exist on the brokers for that > >>>> partition. If Streams runs into such an exception it will recreate the > >>>> state from scratch which might take a while depending on the size of > the > >>>> state. > >>>> > >>>> The root cause of this warning is not clear from the information you > >>>> gave. Did you maybe reset the application but not wipe out the local > >>>> state stores? > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 12.03.21 19:11, Murilo Tavares wrote: > >>>>> Hi > >>>>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams > app > >> on > >>>>> 2.4.0. > >>>>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my > instances > >>>>> stuck on startup. > >>>>> In my understanding, I don't need any special procedure to upgraded > >> from > >>>>> KStreams 2.4.0 to 2.7.0, right? > >>>>> > >>>>> The following error stands out for me: > >>>>> > >>>>> 2021-03-12 16:23:52.005 [...] WARN > >>>>> org.apache.kafka.streams.processor.internals.StreamThread - > >>>> stream-thread > >>>>> [...] Detected the states of tasks > >>>>> > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >>>> are > >>>>> corrupted. Will close the task as dirty and re-create and bootstrap > >> from > >>>>> scratch. > >>>>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > >>>>> changelogs > >>>>> > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >>>> are > >>>>> corrupted and hence needs to be re-initialized > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > >>>>> [app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > >>>>> [app.jar:?] > >>>>> Caused by: > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > >>>>> Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > >>>>> currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > >>>>> euw1-az1)], epoch=27}} is out of range for partition > >>>>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > >>>>> ~[app.jar:?] > >>>>> at > >>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) > >>>>> ~[app.jar:?] > >>>>> ... 4 more > >>>>> > >>>>> Any suggestions on how to upgrade? > >>>>> Thanks > >>>>> Murilo > >>>>> > >>>> > >>> > >> > > >