showuon commented on code in PR #14832:
URL: https://github.com/apache/kafka/pull/14832#discussion_r1418673462
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -699,22 +702,125 @@ void testRemoteLogManagerTasksAvgIdlePercentMetrics()
throws Exception {
Partition mockFollowerPartition =
mockPartition(followerTopicIdPartition);
// before running tasks, the remote log manager tasks should be all
idle
- assertEquals(1.0,
yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
+ assertEquals(1.0, (double)
yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
+ Map<MetricName, Metric> metricNameMetricMapBefore =
KafkaYammerMetrics.defaultRegistry().allMetrics();
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition),
Collections.singleton(mockFollowerPartition), topicIds);
- assertTrue(yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") <
1.0);
+ Map<MetricName, Metric> metricNameMetricMapAfter =
KafkaYammerMetrics.defaultRegistry().allMetrics();
+ assertTrue((double)
yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0);
// unlock copyLogSegmentData
latch.countDown();
}
- private double yammerMetricValue(String name) {
- Gauge<Double> gauge = (Gauge)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ @Test
+ void testRemoteLogManagerRemoteCopyLagBytes() throws Exception {
+ long oldestSegmentStartOffset = 0L;
+ long olderSegmentStartOffset = 75L;
+ long nextSegmentStartOffset = 150L;
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 3 log segments, with 0, 75 and 150 as log start offset
+ LogSegment oldestSegment = mock(LogSegment.class);
+ LogSegment olderSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldestSegment.baseOffset()).thenReturn(oldestSegmentStartOffset);
+ when(olderSegment.baseOffset()).thenReturn(olderSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ FileRecords oldestFileRecords = mock(FileRecords.class);
+ when(oldestSegment.log()).thenReturn(oldestFileRecords);
+ when(oldestFileRecords.file()).thenReturn(tempFile);
+ when(oldestFileRecords.sizeInBytes()).thenReturn(10);
+
when(oldestSegment.readNextOffset()).thenReturn(olderSegmentStartOffset);
+
+ FileRecords olderFileRecords = mock(FileRecords.class);
+ when(olderSegment.log()).thenReturn(olderFileRecords);
+ // TODO: Do I need another temporary file?
+ when(olderFileRecords.file()).thenReturn(tempFile);
+ when(olderFileRecords.sizeInBytes()).thenReturn(10);
+ when(olderSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldestSegment,
olderSegment, activeSegment)));
+
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ OffsetIndex oldestIdx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
+ TimeIndex oldestTimeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset,
""), oldestSegmentStartOffset, 1500).get();
+ File oldestTxnFile = UnifiedLog.transactionIndexFile(tempDir,
oldestSegmentStartOffset, "");
+ oldestTxnFile.createNewFile();
+ TransactionIndex oldestTxnIndex = new
TransactionIndex(oldestSegmentStartOffset, oldestTxnFile);
+ when(oldestSegment.timeIndex()).thenReturn(oldestTimeIdx);
+ when(oldestSegment.offsetIndex()).thenReturn(oldestIdx);
+ when(oldestSegment.txnIndex()).thenReturn(oldestTxnIndex);
+
+ OffsetIndex olderIdx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
olderSegmentStartOffset, ""), olderSegmentStartOffset, 1000).get();
+ TimeIndex olderTimeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, olderSegmentStartOffset,
""), olderSegmentStartOffset, 1500).get();
+ File olderTxnFile = UnifiedLog.transactionIndexFile(tempDir,
olderSegmentStartOffset, "");
+ oldestTxnFile.createNewFile();
+ TransactionIndex olderTxnIndex = new
TransactionIndex(olderSegmentStartOffset, olderTxnFile);
+ when(olderSegment.timeIndex()).thenReturn(olderTimeIdx);
+ when(olderSegment.offsetIndex()).thenReturn(olderIdx);
+ when(olderSegment.txnIndex()).thenReturn(olderTxnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(ans -> Optional.empty()).doAnswer(ans -> {
+ // waiting for verification
+ latch.await();
+ return Optional.empty();
+
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+ Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
+ Partition mockFollowerPartition =
mockPartition(followerTopicIdPartition);
+
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 100L);
+ when(activeSegment.size()).thenReturn(100);
+
+ // before running tasks, the metric should not be registered
+ assertThrows(NoSuchElementException.class, () ->
yammerMetricValue("RemoteCopyLogBytes"));
+
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition),
Collections.singleton(mockFollowerPartition), topicIds);
+ TestUtils.waitForCondition(
+ () -> 75 == safeYammerMetricValue("RemoteCopyLagBytes"),
+ 1000,
Review Comment:
I think we don't need to change the wait time. Using default value makes it
more reliable. We'll jump out the wait loop once the condition met anyway.
```
TestUtils.waitForCondition(
() -> 75 == safeYammerMetricValue("RemoteCopyLagBytes"),
String.format("Expected to find 75 for RemoteCopyLagBytes
metric value, but found %d", safeYammerMetricValue("RemoteCopyLagBytes")));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]