kamalcph commented on code in PR #15165: URL: https://github.com/apache/kafka/pull/15165#discussion_r1620748127
########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -254,6 +259,61 @@ public void testCanProcessRecord() throws InterruptedException { assertEquals(3, handler.metadataCounter); } + @Test + public void testCanReprocessSkippedRecords() throws InterruptedException { + final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); + final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); + final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new TopicPartition("sample", 3)); + assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1)); + assertNotEquals(partitioner.metadataPartition(tpId3), partitioner.metadataPartition(tpId0)); + + final int metadataPartition = partitioner.metadataPartition(tpId0); + final int anotherMetadataPartition = partitioner.metadataPartition(tpId3); + + // Mocking the consumer to be able to wait for the second reassignment + doAnswer(invocation -> { + if (consumerTask.isUserPartitionAssigned(tpId1) && !consumerTask.isUserPartitionAssigned(tpId3)) { + return ConsumerRecords.empty(); + } else { + return invocation.callRealMethod(); + } + }).when(consumer).poll(any()); + + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L)); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition), 0L)); + final Set<TopicIdPartition> assignments = Collections.singleton(tpId0); + consumerTask.addAssignmentsForPartitions(assignments); + thread.start(); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned"); + + // Adding metadata records in the order opposite to the order of assignments + addRecord(consumer, metadataPartition, tpId1, 0); + addRecord(consumer, metadataPartition, tpId0, 1); + TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record"); + // Only one record is processed, tpId1 record is skipped as unassigned + // but read offset is 1 e.g., record for tpId1 has been read by consumer + assertEquals(1, handler.metadataCounter); + + // Adding assignment for tpId1 after related metadata records have already been read + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1)); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned"); + + // Adding assignment for tpId0 to trigger the reset to last read offset + // and assignment for tpId3 that has different metadata partition to trigger the update of metadata snapshot + HashSet<TopicIdPartition> partitions = new HashSet<>(); Review Comment: nit: ``` Set<TopicIdPartition> partitions = new HashSet<>(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org