hachikuji commented on a change in pull request #11294: URL: https://github.com/apache/kafka/pull/11294#discussion_r703942630
########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3242,6 +3260,92 @@ class ReplicaManagerTest { } } + @Test + def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = { + val localId = 1 + val otherId = localId + 1 + val topicPartition = new TopicPartition("foo", 0) + + val mockReplicaFetcherManager = Mockito.mock(classOf[ReplicaFetcherManager]) + val replicaManager = setupReplicaManagerWithMockedPurgatories( + timer = new MockTimer(time), + brokerId = localId, + mockReplicaFetcherManager = Some(mockReplicaFetcherManager) + ) + + try { + // The first call to removeFetcherForPartitions should be ignored. + Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions( + Set(topicPartition)) + ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + + // Make the local replica the follower + var followerTopicsDelta = topicsCreateDelta(localId, false) + var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + assertEquals(0, followerPartition.localLogOrException.logEndOffset) + + // Verify that addFetcherForPartitions was called with the correct + // init offset. + Mockito.verify(mockReplicaFetcherManager, Mockito.times(1)) + .addFetcherForPartitions( + Map(topicPartition -> InitialFetchState( + leader = BrokerEndPoint(otherId, "localhost", 9093), + currentLeaderEpoch = 0, + initOffset = 0 + )) + ) + + // The second call to removeFetcherForPartitions simulate the case + // where the fetcher write to the log before being shutdown. + Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions( + Set(topicPartition)) + ).thenAnswer { _ => + replicaManager.getPartition(topicPartition) match { + case HostedPartition.Online(partition) => + partition.appendRecordsToFollowerOrFutureReplica( + records = MemoryRecords.withRecords(CompressionType.NONE, 0, + new SimpleRecord("first message".getBytes)), + isFuture = false + ) + + case _ => + } + + Map.empty[TopicPartition, PartitionFetchState] + } + + // Apply changes that bumps the leader epoch. + followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false) + followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + assertFalse(followerPartition.isLeader) + assertEquals(1, followerPartition.getLeaderEpoch) + assertEquals(1, followerPartition.localLogOrException.logEndOffset) + + // Verify that addFetcherForPartitions was called with the correct + // init offset. + Mockito.verify(mockReplicaFetcherManager, Mockito.times(1)) + .addFetcherForPartitions( + Map(topicPartition -> InitialFetchState( + leader = BrokerEndPoint(otherId, "localhost", 9093), + currentLeaderEpoch = 1, + initOffset = 1 + )) + ) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) Review comment: nit: is it worthwhile moving this to an `@AfterEach`? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -2220,7 +2210,22 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet) Review comment: Perhaps it's worth a comment here that stopping the fetchers is required so that we can initialize the fetch position correctly. It's a subtle point which we might miss again in the future. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -2229,15 +2234,14 @@ class ReplicaManager(val config: KafkaConfig, } def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = { - stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach { - case (topicPartition, e) => - if (e.isInstanceOf[KafkaStorageException]) { - stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " + - "the local replica for the partition is in an offline log directory") - } else { - stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " + - s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}", e) - } + stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { (topicPartition, e) => + if (e.isInstanceOf[KafkaStorageException]) { + stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " + + "the local replica for the partition is in an offline log directory") Review comment: nit: I was kind of wondering if it's worthwhile logging the exception message in here. We do construct the exception with different messages in different scenarios. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -2229,15 +2234,14 @@ class ReplicaManager(val config: KafkaConfig, } def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = { - stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach { - case (topicPartition, e) => - if (e.isInstanceOf[KafkaStorageException]) { - stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " + - "the local replica for the partition is in an offline log directory") - } else { - stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " + - s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}", e) - } + stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { (topicPartition, e) => + if (e.isInstanceOf[KafkaStorageException]) { Review comment: nit: could we do this with a `match` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org