[
https://issues.apache.org/jira/browse/KAFKA-19936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041199#comment-18041199
]
PoAn Yang commented on KAFKA-19936:
-----------------------------------
This test case cannot pass in trunk branch, because ReplicaManager counts
duplicated records to metrics.
{noformat}
@Test
def testBrokerTopicStatsOnAppend(): Unit = {
val localId = 0
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), localId)
try {
// Create partition and make it leader
val brokerList = Seq[Integer](localId).asJava
val delta = createLeaderDelta(topicId, topicPartition, 0, brokerList,
brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)
// Check the state of that partition
val HostedPartition.Online(partition) =
replicaManager.getPartition(topicPartition)
assertTrue(partition.isLeader)
assertEquals(Set(localId), partition.inSyncReplicaIds)
assertEquals(0, partition.getLeaderEpoch)
// Get initial metric values
val topicStats = replicaManager.brokerTopicStats.topicStats("foo")
val allTopicsStats = replicaManager.brokerTopicStats.allTopicsStats
var initialTopicBytesIn = topicStats.bytesInRate.count()
var initialAllTopicsBytesIn = allTopicsStats.bytesInRate.count()
var initialTopicMessagesIn = topicStats.messagesInRate.count()
var initialAllTopicsMessagesIn = allTopicsStats.messagesInRate.count()
// Append records
val producerId = 234L
val epoch = 5.toShort
val records = MemoryRecords.withIdempotentRecords(Compression.NONE,
producerId, epoch, 0,
new SimpleRecord(s"message 0".getBytes))
appendRecords(replicaManager, topicPartition, records).onFire { response =>
assertEquals(Errors.NONE, response.error)
}
// Verify broker topic stats were updated correctly
assertTrue(topicStats.bytesInRate.count() > initialTopicBytesIn)
assertTrue(allTopicsStats.bytesInRate.count() > initialAllTopicsBytesIn)
assertTrue(topicStats.messagesInRate.count() > initialTopicMessagesIn)
assertTrue(allTopicsStats.messagesInRate.count() >
initialAllTopicsMessagesIn)
initialTopicBytesIn = topicStats.bytesInRate.count()
initialAllTopicsBytesIn = allTopicsStats.bytesInRate.count()
initialTopicMessagesIn = topicStats.messagesInRate.count()
initialAllTopicsMessagesIn = allTopicsStats.messagesInRate.count()
// Append duplicated records
appendRecords(replicaManager, topicPartition, records).onFire { response =>
assertEquals(Errors.NONE, response.error)
}
// Verify broker topic stats skips duplicated records
assertEquals(initialTopicBytesIn, topicStats.bytesInRate.count())
assertEquals(initialAllTopicsBytesIn, allTopicsStats.bytesInRate.count())
assertEquals(initialTopicMessagesIn, topicStats.messagesInRate.count())
assertEquals(initialAllTopicsMessagesIn,
allTopicsStats.messagesInRate.count())
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}{noformat}
> ReplicaManager counts duplicated records to BytesInPerSec and
> MessagesInPerSec metric
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-19936
> URL: https://issues.apache.org/jira/browse/KAFKA-19936
> Project: Kafka
> Issue Type: Bug
> Reporter: PoAn Yang
> Assignee: PoAn Yang
> Priority: Major
>
> For an idempotent producer, duplicated records are not written to disk;
> however, they still contribute to the {{BytesInPerSec}} and
> {{MessagesInPerSec}} metrics.
> 1. If the records are duplicated, UnifiedLog skips these messages.
> [https://github.com/apache/kafka/blob/d27d90ccb3b2b98e02de42afd50910fbbbc162d0/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java#L1221-L1234]
> 2. ReplicaManager counts result from Partition#appendRecordsToLeader to
> metrics.
> [https://github.com/apache/kafka/blob/d27d90ccb3b2b98e02de42afd50910fbbbc162d0/core/src/main/scala/kafka/server/ReplicaManager.scala#L1429-L1437]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)