mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r474934833
########## File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala ########## @@ -0,0 +1,132 @@ +package kafka.server + +import java.util +import java.util.concurrent.{ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicLong + +import kafka.api.LeaderAndIsr +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, AlterIsrRequestTopics} +import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation, + * so partitions will learn about updates through LeaderAndIsr messages sent from the controller + */ +trait AlterIsrChannelManager { + val IsrChangePropagationBlackOut = 5000L + val IsrChangePropagationInterval = 60000L + + def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit + + def clearPending(topicPartition: TopicPartition): Unit + + def startup(): Unit + + def shutdown(): Unit +} + +case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr) + +class AlterIsrChannelManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager, + val zkClient: KafkaZkClient, + val scheduler: Scheduler, + val brokerId: Int, + val brokerEpoch: Long) extends AlterIsrChannelManager with Logging with KafkaMetricsGroup { + + private val pendingIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = new mutable.HashMap[TopicPartition, AlterIsrItem]() + private val lastIsrChangeMs = new AtomicLong(0) + private val lastIsrPropagationMs = new AtomicLong(0) + + @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None + + override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = { + pendingIsrUpdates synchronized { + pendingIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem + lastIsrChangeMs.set(System.currentTimeMillis()) + // Rather than sending right away, we'll delay at most 50ms to allow for batching of ISR changes happening + // in fast succession + if (scheduledRequest.isEmpty) { + scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)) + } + } + } + + override def clearPending(topicPartition: TopicPartition): Unit = { + pendingIsrUpdates synchronized { + // when we get a new LeaderAndIsr, we clear out any pending requests + pendingIsrUpdates.remove(topicPartition) Review comment: Update: after some discussion and looking over failed system tests, we ended up with the following error handling: * REPLICA_NOT_AVAILABLE and INVALID_REPLICA_ASSIGNMENT will clear the pending ISR to let the leader retry. This covers a case where a leader tries to add a replica to the ISR which is offline because it (the follower) just finished shutdown. * FENCED_LEADER_EPOCH, NOT_LEADER_OR_FOLLOWER, UNKNOWN_TOPIC_OR_PARTITION will _not_ clear the pending state and therefor will not retry. We presume here that the controller is correct and the leader has old metadata. By not clearing the pending ISR, the leader will await LeaderAndIsr before attempting any further ISR changes * Other unspecified errors: clear the pending state and let the leader retry. Not sure what cases could cause other errors, but it is probably better to be in a retry loop than to be completely stuck ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org