gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1640209104


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -84,42 +78,41 @@ public class ConsumerTaskTest {
 
     private ConsumerTask consumerTask;
     private MockConsumer<byte[], byte[]> consumer;
-    private Thread thread;
 
     @BeforeEach
     public void beforeEach() {
         final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
             .collect(Collectors.toMap(Function.identity(), e -> 0L));
-        consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         consumer.updateBeginningOffsets(offsets);
         consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 
300_000L, Time.SYSTEM);
-        thread = new Thread(consumerTask);
     }
 
     @AfterEach
-    public void afterEach() throws InterruptedException {
-        if (thread != null) {
-            assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
-            thread.join(10_000);
-            assertFalse(thread.isAlive(), "Consumer task thread is still 
alive");
-        }
+    public void afterEach() {
+        assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+        assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer 
method threw exception");
+        assertTrue(consumer.closed());
     }
 
     /**
      * Tests that the consumer task shuts down gracefully when there were no 
assignments.
      */
     @Test
-    public void testCloseOnNoAssignment() throws InterruptedException {
-        thread.start();
-        Thread.sleep(10);
+    public void testCloseOnNoAssignment() {
         assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+        assertDoesNotThrow(() -> consumerTask.closeConsumer(), "CloseConsumer 
method threw exception");
     }
 
     @Test
     public void testIdempotentClose() {
-        thread.start();
+        // Go through the closure process
         consumerTask.close();
+        consumerTask.closeConsumer();
+
+        // Go through the closure process again
         consumerTask.close();
+        consumerTask.closeConsumer();

Review Comment:
   nit: closeConsumer is normally only called once during thread exit, so it's 
probably not a good idea to call it again here.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +129,46 @@ public void testUserTopicIdPartitionEquals() {
     }
 
     @Test
-    public void testAddAssignmentsForPartitions() throws InterruptedException {
+    public void testAddAssignmentsForPartitions() {
         final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : idPartitions) {
-            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");

Review Comment:
   Can you find all of the `waitForCondition(() -> 
consumerTask.isUserPartitionAssigned` assertions that were removed, and re-add 
them as assertTrue calls?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -308,63 +294,62 @@ public void testCanReprocessSkippedRecords() throws 
InterruptedException {
         // explicitly re-adding the records since MockConsumer drops them on 
poll.
         addRecord(consumer, metadataPartition, tpId1, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
+        consumerTask.ingestRecords();
         // Waiting for all metadata records to be re-read from the first 
metadata partition number
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        assertEquals(Optional.of(1L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         // Verifying that all the metadata records from the first metadata 
partition were processed properly.
-        TestUtils.waitForCondition(() -> handler.metadataCounter == 2, 
"Couldn't read record");
+        assertEquals(2, handler.metadataCounter);
     }
 
     @Test
-    public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+    public void testMaybeMarkUserPartitionsAsReady() {
         final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 2L));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
         assertFalse(handler.isPartitionInitialized.containsKey(tpId));
         IntStream.range(0, 5).forEach(offset -> addRecord(consumer, 
metadataPartition, tpId, offset));
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertEquals(Optional.of(4L), 
consumerTask.readOffsetForMetadataPartition(metadataPartition));
         assertTrue(handler.isPartitionInitialized.get(tpId));
     }
 
     @ParameterizedTest
     @CsvSource(value = {"0, 0", "500, 500"})
-    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset,
-                                                                  long 
endOffset) throws InterruptedException {
+    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset, long endOffset) {
         final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
         final int metadataPartition = partitioner.metadataPartition(tpId);
         
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 beginOffset));
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 endOffset));
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
-        thread.start();
+        consumerTask.ingestRecords();
 
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
         
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
-        TestUtils.waitForCondition(() -> 
handler.isPartitionInitialized.containsKey(tpId),
-            "should have initialized the partition");
+        assertTrue(handler.isPartitionInitialized.containsKey(tpId), "Should 
have initialized the partition");
         
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
     }
 
     @Test
     public void testConcurrentAccess() throws InterruptedException {
-        thread.start();
         final CountDownLatch latch = new CountDownLatch(1);
         final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)),
 0L));
         final Thread assignmentThread = new Thread(() -> {
             try {
                 latch.await();
                 
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+                consumerTask.ingestRecords();

Review Comment:
   I think this test might need an additional Thread. Since ingestRecords is 
only called on one thread normally, it's probably not a good idea to call it on 
two separate threads here. And I don't think the test has the same meaning as 
the original test if only one of assignmentThread and closeThread have the call 
to ingestRecords.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to