jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622292494



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+    val topic0 = "topicWithoutCompaction"
+    val topic1 = "topicWithCompaction"
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val t1p1 = new TopicPartition(topic1, 1)
+    val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+    val producerProps = new Properties()
+    // Each batch will hold about 10 records
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+    val producer = createProducer(configOverrides = producerProps)
+    // First topic will not have compaction. Simply a sanity test.
+    createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+    
+    // Second topic will have compaction. 
+    // The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+    // The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+    val props = new Properties()
+    props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+    props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+    props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+    props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+    // Send records to first partition -- all duplicates.
+    createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+    TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+    // Send records fo second partition -- only last 50 records are duplicates.
+    sendRecords(producer, 50, t1p1)
+    TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+    // Sleep to allow compaction to take place.
+    Thread.sleep(25000)

Review comment:
       Yeah. I was wondering about this too. I originally wrote this test to 
describe the exact case KAFKA-7556 is describing. I'll take a look at this 
again and see if we can bring it down.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to