[ https://issues.apache.org/jira/browse/KAFKA-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mykhailo Baluta updated KAFKA-10501: ------------------------------------ Description: Some __consumer_offsets partitions contain "broken" messages in the second log segment. Example: {code:java} offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59 offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59 offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] {code} Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions. h3. Possible solution Related logs looks like: {code:java} WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code} Related code: {code:java} private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { if (producerEpoch < updatedEntry.producerEpoch) { val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " + s"smaller than the last seen epoch ${updatedEntry.producerEpoch}" if (origin == AppendOrigin.Replication) { warn(message) } else { throw new ProducerFencedException(message) } } } {code} Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch was: Some __comsumer_offsets partitions contain "broken" messages in the second log segment. Example: {code:java} offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59 offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59 offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] {code} Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions. h3. Possible solution Related logs looks like: {code:java} WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code} Related code: {code:java} private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { if (producerEpoch < updatedEntry.producerEpoch) { val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " + s"smaller than the last seen epoch ${updatedEntry.producerEpoch}" if (origin == AppendOrigin.Replication) { warn(message) } else { throw new ProducerFencedException(message) } } } {code} Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch > Log Cleaner never clean up some __consumer_offsets partitions > ------------------------------------------------------------- > > Key: KAFKA-10501 > URL: https://issues.apache.org/jira/browse/KAFKA-10501 > Project: Kafka > Issue Type: Bug > Components: log, log cleaner > Affects Versions: 2.5.0 > Reporter: Mykhailo Baluta > Priority: Major > > Some __consumer_offsets partitions contain "broken" messages in the second > log segment. > Example: > {code:java} > offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true > keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 > producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] > offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 > producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 59 > offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 > producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: ABORT coordinatorEpoch: 59 > offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true > keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 > producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: [] > {code} > Seems like the last 2 records are stored in the wrong order. As a result the > last message is transactional and not any ABORT/COMMIT message after. It > leads to a producer state with ongoing transactions and > firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for > such topic partitions. > h3. Possible solution > Related logs looks like: > {code:java} > WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, > which is smaller than the last seen epoch 1 > (kafka.log.ProducerAppendInfo){code} > Related code: > {code:java} > private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { > if (producerEpoch < updatedEntry.producerEpoch) { > val message = s"Producer's epoch at offset $offset in $topicPartition > is $producerEpoch, which is " + > s"smaller than the last seen epoch ${updatedEntry.producerEpoch}" > if (origin == AppendOrigin.Replication) { > warn(message) > } else { > throw new ProducerFencedException(message) > } > } > } > {code} > Perhaps exception also should be thrown in case of AppendOrigin.Replication > to restrict commit messages into __consumer_offsets topic partitions by old > producer epoch -- This message was sent by Atlassian Jira (v8.3.4#803005)