gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1640209104
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -84,42 +78,41 @@ public class ConsumerTaskTest {
private ConsumerTask consumerTask;
private MockConsumer<byte[], byte[]> consumer;
- private Thread thread;
@BeforeEach
public void beforeEach() {
final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
.collect(Collectors.toMap(Function.identity(), e -> 0L));
- consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
+ consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updateBeginningOffsets(offsets);
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L,
300_000L, Time.SYSTEM);
- thread = new Thread(consumerTask);
}
@AfterEach
- public void afterEach() throws InterruptedException {
- if (thread != null) {
- assertDoesNotThrow(() -> consumerTask.close(), "Close method threw
exception");
- thread.join(10_000);
- assertFalse(thread.isAlive(), "Consumer task thread is still
alive");
- }
+ public void afterEach() {
+ assertDoesNotThrow(() -> consumerTask.close(), "Close method threw
exception");
+ assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer
method threw exception");
+ assertTrue(consumer.closed());
}
/**
* Tests that the consumer task shuts down gracefully when there were no
assignments.
*/
@Test
- public void testCloseOnNoAssignment() throws InterruptedException {
- thread.start();
- Thread.sleep(10);
+ public void testCloseOnNoAssignment() {
assertDoesNotThrow(() -> consumerTask.close(), "Close method threw
exception");
+ assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer
method threw exception");
}
@Test
public void testIdempotentClose() {
- thread.start();
+ // Go through the closure process
consumerTask.close();
+ consumerTask.closeConsumer();
+
+ // Go through the closure process again
consumerTask.close();
+ consumerTask.closeConsumer();
Review Comment:
nit: closeConsumer is normally only called once during thread exit, so it's
probably not a good idea to call it again here.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +129,46 @@ public void testUserTopicIdPartitionEquals() {
}
@Test
- public void testAddAssignmentsForPartitions() throws InterruptedException {
+ public void testAddAssignmentsForPartitions() {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
- thread.start();
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : idPartitions) {
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
Review Comment:
Can you find all of the `waitForCondition(() ->
consumerTask.isUserPartitionAssigned` assertions that were removed, and re-add
them as assertTrue calls?
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -308,63 +294,62 @@ public void testCanReprocessSkippedRecords() throws
InterruptedException {
// explicitly re-adding the records since MockConsumer drops them on
poll.
addRecord(consumer, metadataPartition, tpId1, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
+ consumerTask.ingestRecords();
// Waiting for all metadata records to be re-read from the first
metadata partition number
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ assertEquals(Optional.of(1L),
consumerTask.readOffsetForMetadataPartition(metadataPartition));
// Verifying that all the metadata records from the first metadata
partition were processed properly.
- TestUtils.waitForCondition(() -> handler.metadataCounter == 2,
"Couldn't read record");
+ assertEquals(2, handler.metadataCounter);
}
@Test
- public void testMaybeMarkUserPartitionsAsReady() throws
InterruptedException {
+ public void testMaybeMarkUserPartitionsAsReady() {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
2L));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
- thread.start();
+ consumerTask.ingestRecords();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
assertFalse(handler.isPartitionInitialized.containsKey(tpId));
IntStream.range(0, 5).forEach(offset -> addRecord(consumer,
metadataPartition, tpId, offset));
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
"Couldn't read record");
+ consumerTask.ingestRecords();
+ assertEquals(Optional.of(4L),
consumerTask.readOffsetForMetadataPartition(metadataPartition));
assertTrue(handler.isPartitionInitialized.get(tpId));
}
@ParameterizedTest
@CsvSource(value = {"0, 0", "500, 500"})
- public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset,
- long
endOffset) throws InterruptedException {
+ public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset, long endOffset) {
final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
final int metadataPartition = partitioner.metadataPartition(tpId);
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
beginOffset));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
endOffset));
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
- thread.start();
+ consumerTask.ingestRecords();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
- TestUtils.waitForCondition(() ->
handler.isPartitionInitialized.containsKey(tpId),
- "should have initialized the partition");
+ assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should
have initialized the partition");
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
}
@Test
public void testConcurrentAccess() throws InterruptedException {
- thread.start();
final CountDownLatch latch = new CountDownLatch(1);
final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)),
0L));
final Thread assignmentThread = new Thread(() -> {
try {
latch.await();
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+ consumerTask.ingestRecords();
Review Comment:
I think this test might need an additional Thread. Since ingestRecords is
only called on one thread normally, it's probably not a good idea to call it on
two separate threads here. And I don't think the test has the same meaning as
the original test if only one of assignmentThread and closeThread have the call
to ingestRecords.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]