[ https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688798#comment-16688798 ]
John Roesler commented on KAFKA-7443: ------------------------------------- Hi [~linyli], Wow, this seems like a pretty bad condition. Since you already have a patch for it, would you like to open a pull request on [https://github.com/apache/kafka] ? This would make it easier to review and comment on your proposed fix. Thank you! -John > OffsetOutOfRangeException in restoring state store from changelog topic when > start offset of local checkpoint is smaller than that of changelog topic > ----------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7443 > URL: https://issues.apache.org/jira/browse/KAFKA-7443 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0, 2.1.0 > Reporter: linyue li > Assignee: John Roesler > Priority: Major > Labels: feather > > When restoring local state store from a changelog topic in EOS, kafka stream > will sometimes throw out the OffsetOutOfRangeException such as: > {code:java} > Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from > scratch. > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of > range with no configured reset policy for partitions: > {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734){code} > > This scenario occurs when changelog topic deleted the expired log segments > according to the retention.ms, but the start offset in the local .checkpoint > file is the position when the task last exits from this instance, which may > be smaller than the updated beginning offset of changelog topic. Restoring > store from start offset in checkpoint file will throw exception. > It can be reproduced as below (Kafka Stream runs in EOS): > # task for topic partition test-1 is running on instance A. When task exits, > kafka stream writes the last committed offset 100 for test-1 in checkpoint > file. > # task test-1 transfer to instance B. > # During this time, the remote changelog topic for test-1 updates its start > offset to 120 as the old log segment reaches retention time and is deleted. > # After a while, task test-1 exits from instance B and resumes on instance > A, and task restores local state store of A from checkpoint offset 100, which > is smaller than the valid offset 120 of changelog topic. Such exception > throws out. > When this exception occurs, kafka stream tries to reinitialize the task and > intends to restore from beginning in catch block below. Unfortunately, this > handle not work and the task keeps throwing OffsetOutOfRangeException in the > following restoring processes. > {code:java} > //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java > //handle for OffsetOutOfRangeException in kafka stream > catch (final InvalidOffsetException recoverableException) { > log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to > recreate from scratch.", recoverableException); > final Set<TopicPartition> partitions = recoverableException.partitions(); > for (final TopicPartition partition : partitions) { > final StreamTask task = active.restoringTaskFor(partition); > log.info("Reinitializing StreamTask {} for changelog {}", task, partition); > needsInitializing.remove(partition); > needsRestoring.remove(partition); > > task.reinitializeStateStoresForPartitions(recoverableException.partitions()); > } > restoreConsumer.seekToBeginning(partitions); > }{code} > > Investigate why the handle for this exception not work, I found the root > cause: > Kafka stream registered state restorers in the variable stateRestorers, > which is used to read /update the start and end offset for restoring local > state store. > {code:java} > //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java > private final Map<TopicPartition, StateRestorer> stateRestorers = new > HashMap<>();{code} > When the OffsetOutOfRangeException occurs, kafka stream should updates the > checkpoint offset in the state restorer of this topic partition to > "NO_CHECKPOINT" state, and the next time, task can restore from the beginning > offset of remote changelog topic and resolve this issue. > But in catch block above, > task.reinitializeStateStoresForPartitions(recoverableException.partitions()) > not actually updates the checkpoint offset in stateRestorers, so the next > time it still resumes from the original invalid offset and stuck in this > exception. > I make some fix for this bug, by updating the checkpoint offset for this > stateRestorer. and I validated it works for this issue. The modified code is > as below > {code:java} > catch (final InvalidOffsetException recoverableException) { > log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to > recreate from scratch.", recoverableException); > final Set<TopicPartition> partitions = recoverableException.partitions(); > for (final TopicPartition partition : partitions) { > final StreamTask task = active.restoringTaskFor(partition); > log.info("Reinitializing StreamTask {} for changelog {}", task, partition); > needsInitializing.remove(partition); > needsRestoring.remove(partition); > //add by linyli > final StateRestorer restorer = stateRestorers.get(partition); > restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT); > > task.reinitializeStateStoresForPartitions(recoverableException.partitions()); > } > restoreConsumer.seekToBeginning(partitions); > }{code} > Any comments are welcomed for this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)