nikramakrishnan commented on code in PR #15014:
URL: https://github.com/apache/kafka/pull/15014#discussion_r1426672403
##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -143,6 +144,52 @@ class DelayedRemoteFetchTest {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error)
}
+ @Test
+ def testRequestExpiry(): Unit = {
+ var actualTopicPartition: Option[TopicIdPartition] = None
+ var fetchResultOpt: Option[FetchPartitionData] = None
+
+ def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
+ assertEquals(1, responses.size)
+ actualTopicPartition = Some(responses.head._1)
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+ val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val highWatermark = 100
+ val leaderLogStartOffset = 10
+ val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
+ val remoteFetchTask = mock(classOf[Future[Void]])
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future,
fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
+ Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
+ // Force the delayed remote fetch to expire
+ delayedRemoteFetch.run()
+
+ // Check that the task was cancelled and force-completed
+ verify(remoteFetchTask).cancel(true)
+ assertTrue(delayedRemoteFetch.isCompleted)
+
+ // Check that the ExpiresPerSec metric was incremented.
+ val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
+ assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
Review Comment:
Good shout. Also discovered that the replicaId was set to that of a follower
(>0). Fixed both and updated. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]