[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688798#comment-16688798
 ] 

John Roesler commented on KAFKA-7443:
-------------------------------------

Hi [~linyli],

Wow, this seems like a pretty bad condition.

Since you already have a patch for it, would you like to open a pull request on 
[https://github.com/apache/kafka] ?

This would make it easier to review and comment on your proposed fix.

Thank you!

-John

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7443
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7443
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: linyue li
>            Assignee: John Roesler
>            Priority: Major
>              Labels: feather
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {code:java}
> Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from 
> scratch.
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions: 
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734){code}
>  
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set<TopicPartition> partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>    final StreamTask task = active.restoringTaskFor(partition);
>    log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>    needsInitializing.remove(partition);
>    needsRestoring.remove(partition);
>    
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for restoring local 
> state store.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> private final Map<TopicPartition, StateRestorer> stateRestorers = new 
> HashMap<>();{code}
> When the OffsetOutOfRangeException occurs, kafka stream should updates the 
> checkpoint offset in the state restorer of this topic partition to 
> "NO_CHECKPOINT" state, and the next time, task can restore from the beginning 
> offset of remote changelog topic and resolve this issue.
> But in catch block above, 
> task.reinitializeStateStoresForPartitions(recoverableException.partitions()) 
> not actually updates the checkpoint offset in stateRestorers, so the next 
> time it still resumes from the original invalid offset and stuck in this 
> exception.
> I make some fix for this bug, by updating the checkpoint offset for this 
> stateRestorer. and I validated it works for this issue. The modified code is 
> as below
> {code:java}
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set<TopicPartition> partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>    final StreamTask task = active.restoringTaskFor(partition);
>    log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>    needsInitializing.remove(partition);
>    needsRestoring.remove(partition);
>    //add by linyli
>    final StateRestorer restorer = stateRestorers.get(partition);
>    restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
>    
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  Any comments are welcomed for this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to