chirag-wadhwa5 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1827261148


##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1451,14 +1544,135 @@ public void testLsoMovementByRecordsDeletion() {
 
         totalMessagesConsumed = new AtomicInteger(0);
         future = new CompletableFuture<>();
-        consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true, 
future);
+        consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future);
         assertEquals(0, totalMessagesConsumed.get());
         try {
             assertEquals(0, future.get());
         } catch (Exception e) {
             fail("Exception occurred : " + e.getMessage());
         }
-        adminClient.close();
+        producer.close();
+    }
+
+    @Test
+    public void testShareAutoOffsetResetDefaultValue() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // Producing a record.
+        producer.send(record);
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        // No records should be consumed because share.auto.offset.reset has a 
default of "latest". Since the record
+        // was produced before share partition was initialized (which happens 
after the first share fetch request
+        // in the poll method), the start offset would be the latest offset, 
i.e. 1 (the next offset after the already
+        // present 0th record)
+        assertEquals(0, records.count());
+        // Producing another record.
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        // Now the next record should be consumed successfully
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testShareAutoOffsetResetEarliest() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        // Changing the value of share.auto.offset.reset value to "earliest"
+        alterShareAutoOffsetReset("group1", "earliest");
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // Producing a record.
+        producer.send(record);
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        // Since the value for share.auto.offset.reset has been altered to 
"earliest", the consumer should consume
+        // all messages present on the partition
+        assertEquals(1, records.count());
+        // Producing another record.
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        // The next records should also be consumed successfully
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        // Changing the value of share.auto.offset.reset value to "earliest"
+        alterShareAutoOffsetReset("group1", "earliest");
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
+        try {
+            for (int i = 0; i < 10; i++) {
+                producer.send(record).get();
+            }
+        } catch (Exception e) {
+            fail("Failed to send records: " + e);
+        }
+
+        // We delete records before offset 5, so the LSO should move to 5.
+        adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
+
+        AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, 
future);
+        // The records returned belong to offsets 5-9.
+        assertEquals(5, totalMessagesConsumed.get());
+        try {
+            assertEquals(5, future.get());
+        } catch (Exception e) {
+            fail("Exception occurred : " + e.getMessage());
+        }

Review Comment:
   Thanks for the review. Actually the test will fail, but if I don't surround 
the .send().get() call with try catch, then I will need to put in a throws 
exception in the method definition. Also, I was trying to be consistent with 
the other code written in this file, where try catch has been used in similar 
situations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to