Hi Murilo,
A couple of questions:
1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?
Best,
Bruno
On 15.03.21 15:03, Murilo Tavares wrote:
Hi Bruno
No, I haven't tested resetting the application before upgrading on my
large
environment. But I was able to reproduce it in my dev environment, which
is
way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;
Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.
Thanks
Murilo
On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna <br...@confluent.io.invalid>
wrote:
Hi Murilo,
Did you retry to upgrade again after you reset the application? Did it
work?
Best,
Bruno
On 15.03.21 14:26, Murilo Tavares wrote:
Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.
I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this
same
large environment, I had to downgrade the application, and when doing
that
I did reset the application, which just took a few minutes.
Thanks
Murilo
On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna <br...@confluent.io.invalid
wrote:
Hi Murilo,
No, you do not need any special procedure to upgrade from 2.4 to 2.7.
What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states
of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is
larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of
the
state.
The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?
Best,
Bruno
On 12.03.21 19:11, Murilo Tavares wrote:
Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams
app
on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my
instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded
from
KStreams 2.4.0 to 2.7.0, right?
The following error stands out for me:
2021-03-12 16:23:52.005 [...] WARN
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread
[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
are
corrupted. Will close the task as dirty and re-create and bootstrap
from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
are
corrupted and hence needs to be re-initialized
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[app.jar:?]
Caused by:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
~[app.jar:?]
... 4 more
Any suggestions on how to upgrade?
Thanks
Murilo