hachikuji commented on code in PR #12081:
URL: https://github.com/apache/kafka/pull/12081#discussion_r856597477
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -885,15 +885,18 @@ class Partition(val topicPartition: TopicPartition,
*
* @return true if the HW was incremented, and false otherwise.
*/
- private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, curTime: Long =
time.milliseconds): Boolean = {
+ private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs:
Long = time.milliseconds): Boolean = {
Review Comment:
Do we need the default argument for `currentTimeMs`? Looks like we have
already computed the time in `makeLeader`.
##########
core/src/main/scala/kafka/cluster/Replica.scala:
##########
@@ -62,39 +94,85 @@ class Replica(val brokerId: Int, val topicPartition:
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen
if small produce requests are received at
* high frequency.
*/
- def updateFetchState(followerFetchOffsetMetadata: LogOffsetMetadata,
- followerStartOffset: Long,
- followerFetchTimeMs: Long,
- leaderEndOffset: Long): Unit = {
- if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset)
- _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, followerFetchTimeMs)
- else if (followerFetchOffsetMetadata.messageOffset >=
lastFetchLeaderLogEndOffset)
- _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
-
- _logStartOffset = followerStartOffset
- _logEndOffsetMetadata = followerFetchOffsetMetadata
- lastFetchLeaderLogEndOffset = leaderEndOffset
- lastFetchTimeMs = followerFetchTimeMs
+ def updateFetchState(
+ followerFetchOffsetMetadata: LogOffsetMetadata,
+ followerStartOffset: Long,
+ followerFetchTimeMs: Long,
+ leaderEndOffset: Long
+ ): Unit = {
+ replicaState.updateAndGet { currentReplicaState =>
+ val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >=
leaderEndOffset) {
+ math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs)
+ } else if (followerFetchOffsetMetadata.messageOffset >=
currentReplicaState.lastFetchLeaderLogEndOffset) {
+ math.max(currentReplicaState.lastCaughtUpTimeMs,
currentReplicaState.lastFetchTimeMs)
+ } else {
+ currentReplicaState.lastCaughtUpTimeMs
+ }
+
+ ReplicaState(
+ logStartOffset = followerStartOffset,
+ logEndOffsetMetadata = followerFetchOffsetMetadata,
+ lastFetchLeaderLogEndOffset = math.max(leaderEndOffset,
currentReplicaState.lastFetchLeaderLogEndOffset),
+ lastFetchTimeMs = followerFetchTimeMs,
+ lastCaughtUpTimeMs = lastCaughtUpTime
+ )
+ }
}
- def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long,
lastCaughtUpTimeMs: Long): Unit = {
- lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
- lastFetchTimeMs = curTimeMs
- _lastCaughtUpTimeMs = lastCaughtUpTimeMs
+ /**
+ * When the leader is elected or re-elected, the state of the follower is
reinitialized
+ * accordingly.
+ */
+ def resetReplicaState(
+ currentTimeMs: Long,
+ leaderEndOffset: Long,
+ isNewLeader: Boolean,
+ isFollowerInSync: Boolean
+ ): Unit = {
+ replicaState.updateAndGet { currentReplicaState =>
+ // When the leader is elected or re-elected, the follower's last caught
up time
+ // is set to the current time if the follower is in the ISR, else to 0.
The later
Review Comment:
nit: The latter?
##########
core/src/test/scala/unit/kafka/cluster/LogTest.scala:
##########
@@ -0,0 +1,132 @@
+/*
Review Comment:
I'm ok with that, but it might not be as much work as you imagine:
`UnifiedLogTest` has a pretty minimalistic setup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]