gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1643026116
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -309,49 +300,52 @@ public void testCanReprocessSkippedRecords() throws
InterruptedException {
// explicitly re-adding the records since MockConsumer drops them on
poll.
addRecord(consumer, metadataPartition, tpId1, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
+ consumerTask.ingestRecords();
// Waiting for all metadata records to be re-read from the first
metadata partition number
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ assertEquals(Optional.of(1L),
consumerTask.readOffsetForMetadataPartition(metadataPartition));
// Verifying that all the metadata records from the first metadata
partition were processed properly.
- TestUtils.waitForCondition(() -> handler.metadataCounter == 2,
"Couldn't read record");
+ assertEquals(2, handler.metadataCounter);
}
@Test
- public void testMaybeMarkUserPartitionsAsReady() throws
InterruptedException {
+ public void testMaybeMarkUserPartitionsAsReady() {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
2L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
- thread.start();
+ consumerTask.ingestRecords();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+ assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " +
tpId + " has not been assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
assertFalse(handler.isPartitionInitialized.containsKey(tpId));
IntStream.range(0, 5).forEach(offset -> addRecord(consumer,
metadataPartition, tpId, offset));
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
"Couldn't read record");
+ consumerTask.ingestRecords();
+ assertEquals(Optional.of(4L),
consumerTask.readOffsetForMetadataPartition(metadataPartition));
assertTrue(handler.isPartitionInitialized.get(tpId));
}
@ParameterizedTest
@CsvSource(value = {"0, 0", "500, 500"})
- public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset,
- long
endOffset) throws InterruptedException {
+ public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset, long endOffset) {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
beginOffset));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
endOffset));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
- thread.start();
+ consumerTask.ingestRecords();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+ assertTrue(consumerTask.isUserPartitionAssigned(tpId), "Partition " +
tpId + " has not been assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
- TestUtils.waitForCondition(() ->
handler.isPartitionInitialized.containsKey(tpId),
- "should have initialized the partition");
+ assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should
have initialized the partition");
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
}
@Test
public void testConcurrentAccess() throws InterruptedException {
- thread.start();
+ // Here we need to test concurrent access. When ConsumerTask is
ingesting records,
+ // we need to concurrently add partitions and perform close()
+ new Thread(consumerTask).start();
Review Comment:
Please store this thread in a variable and join it like the others to make
sure it is stopped before the end of the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]