MarcoLotz commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r579519988
########## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ########## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } + @Test + public void testResetToSpecificOffsetWhenPartitionIsEmpty() { + final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + emptyConsumer.assign(Collections.singletonList(topicPartition)); + + final Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 0L); + emptyConsumer.updateEndOffsets(endOffsets); + + final Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + emptyConsumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + + final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); Review comment: @mjsax I guess got your point. In the CUT the call ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` makes the consumer client to seek that offset (if available) without a commit. The call to commit offset happens in another section of the code that is not under test there (line 407 of StreamsResetter.java). I will change the test logic so that the "given condition" uses ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` and "when condition" is actually the call of ```client.commitSync();```. This way, the "then condition" would be able to test using the commited offsets. Same applies for the other test. ########## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ########## @@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } + @Test + public void testResetToSpecificOffsetWhenPartitionIsEmpty() { + final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + emptyConsumer.assign(Collections.singletonList(topicPartition)); + + final Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 0L); + emptyConsumer.updateEndOffsets(endOffsets); + + final Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + emptyConsumer.updateBeginningOffsets(beginningOffsets); + + streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + + final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500)); + assertEquals(0, records.count()); Review comment: @mjsax I guess got your point. In the CUT the call ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` makes the consumer client to seek that offset (if available) without a commit. The call to commit offset happens in another section of the code that is not under test there (line 407 of StreamsResetter.java). I will change the test logic so that the "given condition" uses ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` and "when condition" is actually the call of ```client.commitSync();```. This way, the "then condition" would be able to test using the committed offsets. Same applies for the other test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org