[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716567#comment-16716567
 ] 

ASF GitHub Bot commented on KAFKA-7443:
---

mjsax closed pull request #5946: KAFKA-7443: OffsetOutOfRangeException in 
restoring state store from changelog topic when start offset of local 
checkpoint is smaller than that of changelog topic
URL: https://github.com/apache/kafka/pull/5946
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index e43c292e6a3..f877f9d13a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -188,6 +188,10 @@ public synchronized void unsubscribe() {
 if (!subscriptions.isPaused(entry.getKey())) {
 final List> recs = entry.getValue();
 for (final ConsumerRecord rec : recs) {
+if (beginningOffsets.get(entry.getKey()) != null && 
beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+throw new 
OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), 
subscriptions.position(entry.getKey(;
+}
+
 if (assignment().contains(entry.getKey()) && rec.offset() 
>= subscriptions.position(entry.getKey())) {
 results.computeIfAbsent(entry.getKey(), partition -> 
new ArrayList<>()).add(rec);
 subscriptions.position(entry.getKey(), rec.offset() + 
1);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34e6e5cdb6f..fdd9d6c303c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -107,6 +107,8 @@ public void register(final StateRestorer restorer) {
 needsInitializing.remove(partition);
 needsRestoring.remove(partition);
 
+final StateRestorer restorer = stateRestorers.get(partition);
+restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
 
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
 }
 restoreConsumer.seekToBeginning(partitions);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 34f0a32b88c..d08f0d7360d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -157,6 +157,42 @@ public void 
shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
 assertThat(callback.restored.size(), equalTo(messages));
 }
 
+@Test
+public void 
shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+final int messages = 10;
+final int startOffset = 5;
+final long expiredCheckpoint = 1L;
+assignPartition(messages, topicPartition);
+
consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) 
startOffset));
+consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
(long) (messages + startOffset)));
+
+addRecords(messages, topicPartition, startOffset);
+consumer.assign(Collections.emptyList());
+
+final StateRestorer stateRestorer = new StateRestorer(
+topicPartition,
+restoreListener,
+expiredCheckpoint,
+Long.MAX_VALUE,
+true,
+"storeName");
+changelogReader.register(stateRestorer);
+
+
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+EasyMock.replay(active, task);
+
+// first restore call "fails" since OffsetOutOfRangeException but we 
should not die with an exception
+assertEquals(0, changelogReader.restore(active).size());
+//the starting offset for stateRestorer is set to NO_CHECKPOINT
+assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+//restore the active task again
+

[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-11-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699680#comment-16699680
 ] 

Matthias J. Sax commented on KAFKA-7443:


[~linyli] thanks for the patch. I added you to the list of contributors and 
assigned the ticket to you. You can know also self-assign tickets.

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Major
>  Labels: feather
> Attachments: KAFKA-7443.url
>
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {quote}Restoring StreamTasks failed. Deleting StreamTasks stores to recreate 
> from scratch.
>  org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions:
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734)
>  
> {quote}
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>final StreamTask task = active.restoringTaskFor(partition);
>log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>needsInitializing.remove(partition);
>needsRestoring.remove(partition);
>
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for restoring local 
> state store.
> {code:java}
> 

[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698912#comment-16698912
 ] 

ASF GitHub Bot commented on KAFKA-7443:
---

linyli001 opened a new pull request #5946: KAFKA-7443: 
OffsetOutOfRangeException in restoring state store from changelog topic when 
start offset of local checkpoint is smaller than that of changelog topic
URL: https://github.com/apache/kafka/pull/5946
 
 
   See https://issues.apache.org/jira/browse/KAFKA-7443 for details.
   The fix set this state partition to "NO_CHECKPOINT" when the offset in local 
checkpoint file has expired and older than the current start offset of 
changelog topic, thus making this task to restore local state from the current 
beginning offset of changelog topic, avoiding falling into the infinite loop 
caused by this exception.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: John Roesler
>Priority: Major
>  Labels: feather
> Attachments: KAFKA-7443.url
>
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {quote}Restoring StreamTasks failed. Deleting StreamTasks stores to recreate 
> from scratch.
>  org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions:
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734)
>  
> {quote}
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate 

[jira] [Commented] (KAFKA-7443) OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

2018-11-15 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688798#comment-16688798
 ] 

John Roesler commented on KAFKA-7443:
-

Hi [~linyli],

Wow, this seems like a pretty bad condition.

Since you already have a patch for it, would you like to open a pull request on 
[https://github.com/apache/kafka] ?

This would make it easier to review and comment on your proposed fix.

Thank you!

-John

> OffsetOutOfRangeException in restoring state store from changelog topic when 
> start offset of local checkpoint is smaller than that of changelog topic
> -
>
> Key: KAFKA-7443
> URL: https://issues.apache.org/jira/browse/KAFKA-7443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: John Roesler
>Priority: Major
>  Labels: feather
>
> When restoring local state store from a changelog topic in EOS, kafka stream 
> will sometimes throw out the OffsetOutOfRangeException such as:
> {code:java}
> Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from 
> scratch.
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions: 
> {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734){code}
>  
> This scenario occurs when changelog topic deleted the expired log segments 
> according to the retention.ms, but the start offset in the local .checkpoint 
> file is the position when the task last exits from this instance, which may 
> be smaller than the updated beginning offset of changelog topic. Restoring 
> store from start offset in checkpoint file will throw exception.
> It can be reproduced as below (Kafka Stream runs in EOS):
>  # task for topic partition test-1 is running on instance A. When task exits, 
> kafka stream writes the last committed offset 100 for test-1 in checkpoint 
> file.
>  # task test-1 transfer to instance B.
>  # During this time, the remote changelog topic for test-1 updates its start 
> offset to 120 as the old log segment reaches retention time and is deleted.
>  # After a while, task test-1 exits from instance B and resumes on instance 
> A, and task restores local state store of A from checkpoint offset 100, which 
> is smaller than the valid offset 120 of changelog topic. Such exception 
> throws out.
> When this exception occurs, kafka stream tries to reinitialize the task and 
> intends to restore from beginning in catch block below. Unfortunately, this 
> handle not work and the task keeps throwing  OffsetOutOfRangeException in the 
> following restoring processes.
> {code:java}
> //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
> //handle for OffsetOutOfRangeException in kafka stream
> catch (final InvalidOffsetException recoverableException) {
>  log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to 
> recreate from scratch.", recoverableException);
>  final Set partitions = recoverableException.partitions();
>  for (final TopicPartition partition : partitions) {
>final StreamTask task = active.restoringTaskFor(partition);
>log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
>needsInitializing.remove(partition);
>needsRestoring.remove(partition);
>
> task.reinitializeStateStoresForPartitions(recoverableException.partitions());
>  }
>  restoreConsumer.seekToBeginning(partitions);
> }{code}
>  
>  Investigate why the handle for this exception not work, I found the root 
> cause:
>  Kafka stream registered state restorers in the variable stateRestorers, 
> which is used to read /update the start and end offset for