Hi Murilo,

OK, now I see why you do not get an error in the second case in your small environment where you cleaned up before upgrading. You would restore from the earliest offset anyway and that is defined by the earliest offset at the broker and that always exists. Hence, no out of range exception is thrown.

I am wondering why you get a out of range exception after upgrading without clean up, though.

A solution would be to clean up before upgrading in your large environment. I do not know if this is a viable solution for you.

Best,
Bruno

On 15.03.21 16:01, Murilo Tavares wrote:
Hi Bruno
We have an environment variable that, when set, will call
KafkaStreams.cleanup() and sleep.
The changelog topic is an internal KafkaStreams topic, for which I'm not
changing any policies.
It should be some default policy for a KTable in my understanding.
Thanks
Murilo



On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna <br...@confluent.io.invalid>
wrote:

Hi Murilo,

A couple of questions:

1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?

Best,
Bruno

On 15.03.21 15:03, Murilo Tavares wrote:
Hi Bruno
No, I haven't tested resetting the application before upgrading on my
large
environment. But I was able to reproduce it in my dev environment, which
is
way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna <br...@confluent.io.invalid>
wrote:

Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it
work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:
Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this
same
large environment, I had to downgrade the application, and when doing
that
I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna <br...@confluent.io.invalid

wrote:

Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states
of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is
larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of
the
state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:
Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams
app
on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my
instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded
from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
     org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread
[...] Detected the states of tasks

{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
are
corrupted. Will close the task as dirty and re-create and bootstrap
from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs

{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
are
corrupted and hence needs to be re-initialized
at



org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
~[app.jar:?]
at



org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
~[app.jar:?]
at



org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
~[app.jar:?]
at



org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[app.jar:?]
at



org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[app.jar:?]
Caused by:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at



org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
~[app.jar:?]
at



org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
~[app.jar:?]
at



org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
~[app.jar:?]
at



org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
~[app.jar:?]
at



org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
~[app.jar:?]
at



org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
~[app.jar:?]
at



org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo







Reply via email to