Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-04-01 Thread via GitHub


johnnychhsu commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2029543783

   @kamalcph thanks for the comment, and sorry for the late reply. 
   Previously we get the counts from the yammer metrics, however, although the 
`onExpire` has called the `mark`, the counter value was not updated. 
   I have tried to log the process and observe the behaviour, everything want 
as expected, but the metrics still didn't reflect. 
   After discussion with @showuon , we found that the potential reason could 
be, that metrics was affected by other tests, because all those tests run in 
the same JVM.
   By change the way we verify the metrics, the updated test can still cover 
the original test case, but without the flaky issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-31 Thread via GitHub


showuon merged PR #15463:
URL: https://github.com/apache/kafka/pull/15463


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-31 Thread via GitHub


showuon commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2029068424

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-30 Thread via GitHub


chia7712 commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2028125395

   > For my understanding, Could you please explain how this patch fixed the 
issue? The safeYammerMetricValue was also returning the meter count. Thanks!
   
   @kamalcph the variable of metrics item 
(`kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec`) is singleton 
object and it could be removed by other tests which are running in same JVM 
(and it is not recreated). Hence, verifying the metrics value is not stable to 
this test case.
   
   For example, the following test case fails on the last `assert`
   ```scala
   TestUtils.clearYammerMetrics()
   DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
   // pass since the singleton object is created lazily when it is used
   assertEquals(1, 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
   TestUtils.clearYammerMetrics()
   assertEquals(0, 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
   DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
   // fails since the object is removed from yammer
   assertEquals(1, 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-30 Thread via GitHub


chia7712 commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1545435135


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]

Review Comment:
   As we don't use value from yammer now, it seems the 
`metricsToBeDeletedInTheEnd` is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-29 Thread via GitHub


kamalcph commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2027930309

   > Instead of getting the number from yammer metrics, we can check the metric 
counter and see if it increased
   
   For my understanding, Could you please explain how this patch fixed the 
issue? The `safeYammerMetricValue` was also returning the `meter` count. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-29 Thread via GitHub


johnnychhsu commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1544639805


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   @showuon just updated, thanks for the comment!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-29 Thread via GitHub


johnnychhsu commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2027410113

   @kamalcph just updated, thanks for the review! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-29 Thread via GitHub


johnnychhsu commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r154463


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1460,10 +1460,8 @@ class ReplicaManager(val config: KafkaConfig,
 warn("Unable to fetch data from remote storage", e)
 return Some(createLogReadResult(e))
 }
-

Review Comment:
   sure, just removed it, thanks for the review @chia7712 !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-28 Thread via GitHub


showuon commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1542760219


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   OK, the testRemoteFetchExpiresPerSecMetric are all passed. Please use 
`TestUtils.waitUntilTrue` here, otherwise, LGTM! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-28 Thread via GitHub


chia7712 commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1542616917


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1460,10 +1460,8 @@ class ReplicaManager(val config: KafkaConfig,
 warn("Unable to fetch data from remote storage", e)
 return Some(createLogReadResult(e))
 }
-

Review Comment:
   Could you please remove those unrelated changes? I guess they are vestige 
code of tracking this flaky :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


kamalcph commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2024392191

   @johnnychhsu 
   
   Can you update the PR summary if it is ready for review? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


showuon commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1542235961


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   Looks like it didn't appear in the rebuild. Let me re-trigger the CI again.
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/13/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


johnnychhsu commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1541419703


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   ah i see. thanks for the comment @showuon ! 
   let me address that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-25 Thread via GitHub


showuon commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1538452997


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   I think we should use use `TestUtils.waitUntilTrue` here because the 
`DelayedRemoteFetchMetrics` is marked in a separate thread, we can't make sure 
it will be triggered immediately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-25 Thread via GitHub


johnnychhsu commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2018127603

   updated @showuon 
   it works in my local, thanks for the sharing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-25 Thread via GitHub


johnnychhsu commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1537571752


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4166,12 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  verify(mockDelayedRemoteFetchMetricsMeter, times(1)).mark()

Review Comment:
   thanks @showuon for the suggestion! sure let me try that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-21 Thread via GitHub


showuon commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1534892451


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4166,12 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  verify(mockDelayedRemoteFetchMetricsMeter, times(1)).mark()

Review Comment:
   Yes, it indeed fixed the issue. But I found there's a better way to verify 
it without changing `expiredRequestMeter` from `val` to `var`. We can verify 
like this:
   
   ```
   val curExpiresPerSec = DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(...)
   timer.advanceClock(2000L)
   assertTrue(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())
   ```
   
   Had a test, and it works. Would you please give it a try?
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org