[jira] [Commented] (KAFKA-6364) Add Second Check for End Offset During Restore

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364752#comment-16364752
 ] 

ASF GitHub Bot commented on KAFKA-6364:
---

guozhangwang closed pull request #4511: KAFKA-6364: second check for ensuring 
changelog topic not changed during restore
URL: https://github.com/apache/kafka/pull/4511
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c4929959211..ceb7024b97b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -51,7 +51,7 @@
 private final Map> partitions;
 private final SubscriptionState subscriptions;
 private final Map beginningOffsets;
-private final Map endOffsets;
+private final Map> endOffsets;
 private final Map committed;
 private final Queue pollTasks;
 private final Set paused;
@@ -290,8 +290,26 @@ public synchronized void 
seekToEnd(Collection partitions) {
 subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
 }
 
-public synchronized void updateEndOffsets(Map 
newOffsets) {
-endOffsets.putAll(newOffsets);
+// needed for cases where you make a second call to endOffsets
+public synchronized void addEndOffsets(final Map 
newOffsets) {
+innerUpdateEndOffsets(newOffsets, false);
+}
+
+public synchronized void updateEndOffsets(final Map 
newOffsets) {
+innerUpdateEndOffsets(newOffsets, true);
+}
+
+private void innerUpdateEndOffsets(final Map 
newOffsets,
+   final boolean replace) {
+
+for (final Map.Entry entry : 
newOffsets.entrySet()) {
+List offsets = endOffsets.get(entry.getKey());
+if (replace || offsets == null) {
+offsets = new ArrayList<>();
+}
+offsets.add(entry.getValue());
+endOffsets.put(entry.getKey(), offsets);
+}
 }
 
 @Override
@@ -354,7 +372,7 @@ public synchronized void resume(Collection 
partitions) {
 public synchronized Map 
endOffsets(Collection partitions) {
 Map result = new HashMap<>();
 for (TopicPartition tp : partitions) {
-Long endOffset = endOffsets.get(tp);
+Long endOffset = getEndOffset(endOffsets.get(tp));
 if (endOffset == null)
 throw new IllegalStateException("The partition " + tp + " does 
not have an end offset.");
 result.put(tp, endOffset);
@@ -430,7 +448,7 @@ private void resetOffsetPosition(TopicPartition tp) {
 if (offset == null)
 throw new IllegalStateException("MockConsumer didn't have 
beginning offset specified, but tried to seek to beginning");
 } else if (strategy == OffsetResetStrategy.LATEST) {
-offset = endOffsets.get(tp);
+offset = getEndOffset(endOffsets.get(tp));
 if (offset == null)
 throw new IllegalStateException("MockConsumer didn't have end 
offset specified, but tried to seek to end");
 } else {
@@ -438,4 +456,11 @@ private void resetOffsetPosition(TopicPartition tp) {
 }
 seek(tp, offset);
 }
+
+private Long getEndOffset(List offsets) {
+if (offsets == null || offsets.isEmpty()) {
+return null;
+}
+return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
+}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b11c45ba313..5fcba76570e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -255,6 +255,15 @@ private void restorePartition(final 
ConsumerRecords allRecords,
 throw new TaskMigratedException(task, topicPartition, 
endOffset, pos);
 }
 
+// need to check for changelog topic
+if (restorer.offsetLimit() == Long.MAX_VALUE) {
+final Long updatedEndOffset = 
restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
+if (!restorer.hasCompleted(pos, updatedEndOffset)) {
+throw new TaskMigratedException(task, topicPartition, 
updatedEndOffset, pos);
+

[jira] [Commented] (KAFKA-6364) Add Second Check for End Offset During Restore

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349456#comment-16349456
 ] 

ASF GitHub Bot commented on KAFKA-6364:
---

bbejeck opened a new pull request #4511: KAFKA-6364: second check for ensuring 
changelog topic not changed during restore
URL: https://github.com/apache/kafka/pull/4511
 
 
   Added a second check for race condition where store changelog topic updated 
during restore, but not if a KTable changelog topic. 
   
   This will be tricky to test, but I wanted to push the PR to get feedback on 
the approach.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Second Check for End Offset During Restore
> --
>
> Key: KAFKA-6364
> URL: https://issues.apache.org/jira/browse/KAFKA-6364
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.0.2
>
>
> We need to re-check the ending offset when restoring a changelog topic to 
> guard against the race condition of an additional record appended to log 
> immediately on restoring start.  Also, need to add a check for KTable source 
> topic and if offset limit is set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)