dajac commented on code in PR #12150:
URL: https://github.com/apache/kafka/pull/12150#discussion_r875822488
##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -439,18 +545,33 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = false)
- def assertReadRecordsError(error: Errors,
- currentLeaderEpochOpt: Optional[Integer],
- fetchOnlyLeader: Boolean): Unit = {
+ def assertReadRecordsError(
Review Comment:
Thanks. It already looks much better like this.
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1133,16 +1131,95 @@ class Partition(val topicPartition: TopicPartition,
info.copy(leaderHwChange = if (leaderHWIncremented)
LeaderHwChange.Increased else LeaderHwChange.Same)
}
- def readRecords(lastFetchedEpoch: Optional[Integer],
- fetchOffset: Long,
- currentLeaderEpoch: Optional[Integer],
- maxBytes: Int,
- fetchIsolation: FetchIsolation,
- fetchOnlyFromLeader: Boolean,
- minOneMessage: Boolean): LogReadInfo =
inReadLock(leaderIsrUpdateLock) {
- // decide whether to only fetch from leader
- val localLog = localLogWithEpochOrException(currentLeaderEpoch,
fetchOnlyFromLeader)
+ def fetchRecords(
+ fetchParams: FetchParams,
+ fetchPartitionData: FetchRequest.PartitionData,
+ fetchTimeMs: Long,
+ maxBytes: Int,
+ minOneMessage: Boolean,
+ updateFetchState: Boolean
+ ): LogReadInfo = {
+ def doReadRecords(log: UnifiedLog): LogReadInfo = {
+ readRecords(
+ log,
+ fetchPartitionData.lastFetchedEpoch,
+ fetchPartitionData.fetchOffset,
+ fetchPartitionData.currentLeaderEpoch,
+ maxBytes,
+ fetchParams.isolation,
+ minOneMessage
+ )
+ }
+
+ if (fetchParams.isFromFollower) {
+ val replica = followerReplicaOrThrow(fetchParams.replicaId,
fetchPartitionData)
+
+ val logReadInfo = inReadLock(leaderIsrUpdateLock) {
+ doReadRecords(localLogWithEpochOrThrow(
+ fetchPartitionData.currentLeaderEpoch,
+ fetchParams.fetchOnlyLeader
+ ))
+ }
+ if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) {
+ updateFollowerFetchState(
+ replica,
+ followerFetchOffsetMetadata =
logReadInfo.fetchedData.fetchOffsetMetadata,
+ followerStartOffset = fetchPartitionData.logStartOffset,
+ followerFetchTimeMs = fetchTimeMs,
+ leaderEndOffset = logReadInfo.logEndOffset
+ )
+ }
+
+ logReadInfo
+ } else {
+ inReadLock(leaderIsrUpdateLock) {
+ val localLog =
localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,
fetchParams.fetchOnlyLeader)
+ doReadRecords(localLog)
+ }
Review Comment:
nit: This block of code to read from the local log is similar to the one
before. Did you consider pushing it entirely in `doReadRecords`? That would
reduce the code duplication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]