MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579519988



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+        final ConsumerRecords<byte[], byte[]> records = 
emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       @mjsax I guess got your point. In the CUT the call 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
makes the consumer client to seek that offset (if available) without a commit. 
The call to commit offset happens in another section of the code that is not 
under test there (line 407 of StreamsResetter.java).
   
   I will change the test logic so that the "given condition" uses 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
and "when condition" is actually the call of ```client.commitSync();```.
   
   This way, the "then condition" would be able to test using the commited 
offsets. Same applies for the other test.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+        final ConsumerRecords<byte[], byte[]> records = 
emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       @mjsax I guess got your point. In the CUT the call 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
makes the consumer client to seek that offset (if available) without a commit. 
The call to commit offset happens in another section of the code that is not 
under test there (line 407 of StreamsResetter.java).
   
   I will change the test logic so that the "given condition" uses 
```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` 
and "when condition" is actually the call of ```client.commitSync();```.
   
   This way, the "then condition" would be able to test using the committed 
offsets. Same applies for the other test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to