mumrah commented on a change in pull request #9732: URL: https://github.com/apache/kafka/pull/9732#discussion_r543533537
########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -149,29 +152,32 @@ case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: Abs class BrokerToControllerRequestThread(networkClient: KafkaClient, metadataUpdater: ManualMetadataUpdater, - requestQueue: LinkedBlockingDeque[BrokerToControllerQueueItem], metadataCache: kafka.server.MetadataCache, config: KafkaConfig, listenerName: ListenerName, time: Time, threadName: String) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) { + private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]() private var activeController: Option[Node] = None - def generateRequests(): Iterable[RequestAndCompletionHandler] = { - val requestsToSend = new mutable.Queue[RequestAndCompletionHandler] - val topRequest = requestQueue.poll() - if (topRequest != null) { - val request = RequestAndCompletionHandler( + def enqueue(request: BrokerToControllerQueueItem): Unit = { + requestQueue.add(request) + if (activeController.isDefined) { + wakeup() + } + } + + override def generateRequests(): Iterable[RequestAndCompletionHandler] = { + Option(requestQueue.poll()).map { queueItem => Review comment: 👍 ########## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ########## @@ -53,6 +54,41 @@ object KafkaNetworkChannel { } +private[raft] class RaftSendThread( Review comment: style nit/question: I think we have a mixture of argument indentation for Scala classes/methods. Do we have an established style convention for this? Normally I follow the opening paren when breaking out arguments into their own line (though I'm not sure that's correct) ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.raft.RaftMessage; +import org.apache.kafka.raft.RaftMessageQueue; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class BlockingMessageQueue implements RaftMessageQueue { + private static final RaftMessage WAKEUP_MESSAGE = new RaftMessage() { Review comment: 👍 ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftMessageQueue.java ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +/** + * This class is used to serialize inbound requests or responses to outbound requests. + * It basically just allows us to wrap a blocking queue so that we can have a mocked + * implementation which does not depend on system time. + * + * See {@link org.apache.kafka.raft.internals.BlockingMessageQueue}. + */ +public interface RaftMessageQueue { + + /** + * Block for the arrival of a new message. + * + * @param timeoutMs timeout in milliseconds to wait for a new event + * @return the event or null if either the timeout was reached or there was + * a call to {@link #wakeup()} before any events became available + */ + RaftMessage poll(long timeoutMs); + + /** + * Offer a new message to the queue. + * + * @param message the message to deliver + * @throws IllegalStateException if the queue cannot accept the message + */ + void offer(RaftMessage message); Review comment: nit: Maybe name this `add` so it aligns with the java.util.Queue method? ########## File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala ########## @@ -32,17 +32,19 @@ import scala.jdk.CollectionConverters._ /** * Class for inter-broker send thread that utilize a non-blocking network client. */ -abstract class InterBrokerSendThread(name: String, - networkClient: KafkaClient, - time: Time, - isInterruptible: Boolean = true) - extends ShutdownableThread(name, isInterruptible) { +abstract class InterBrokerSendThread( + name: String, + networkClient: KafkaClient, + requestTimeoutMs: Int, + time: Time, + isInterruptible: Boolean = true +) extends ShutdownableThread(name, isInterruptible) { - def generateRequests(): Iterable[RequestAndCompletionHandler] - def requestTimeoutMs: Int private val unsentRequests = new UnsentRequests - def hasUnsentRequests = unsentRequests.iterator().hasNext + def generateRequests(): Iterable[RequestAndCompletionHandler] + + def hasUnsentRequests: Boolean = unsentRequests.iterator().hasNext Review comment: Is it worth adding a `size` or `isEmpty` to `UnsentRequests`? ########## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ########## @@ -127,11 +127,14 @@ class TxnMarkerQueue(@volatile var destination: Node) { def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size) } -class TransactionMarkerChannelManager(config: KafkaConfig, - metadataCache: MetadataCache, - networkClient: NetworkClient, - txnStateManager: TransactionStateManager, - time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup { +class TransactionMarkerChannelManager( Review comment: nit: (related to style question elsewhere) if we want to change the style of these class definitions, can we do it as a separate PR? I always find it difficult when style changes are conflated with logical changes ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -221,7 +216,7 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } else { // need to backoff to avoid tight loops debug("No controller defined in metadata cache, retrying after backoff") - backoff() Review comment: Hmm, this seems strange, though maybe I'm missing something. If we get here, `activeController` is not defined. If we then pollOnce it looks like the request produced by `generateRequests` will get an exception since the activeController Option is empty. Previously, the backoff method was just pausing the thread for some time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org