Maysam Yabandeh created KAFKA-3963:
--------------------------------------

             Summary: Missing messages from the controller to brokers
                 Key: KAFKA-3963
                 URL: https://issues.apache.org/jira/browse/KAFKA-3963
             Project: Kafka
          Issue Type: Bug
          Components: core
            Reporter: Maysam Yabandeh
            Priority: Minor


The controller takes messages from a queue and send it to the designated 
broker. If the controller times out on receiving a response from the broker 
(30s) it closes the connection and retries again after a backoff period, 
however it does not return the message back to the queue. As a result the retry 
will start with the next message and the previous message might have never been 
received by the broker.
{code}
    val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
...
          try {
...
              clientResponse = 
networkClient.blockingSendAndReceive(clientRequest)(time)
...
            }
          } catch {
            case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
              warn(("Controller %d epoch %d fails to send request %s to broker 
%s. " +
                "Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
                  request.toString, brokerNode.toString()), e)
              networkClient.close(brokerNode.idString)
...
          }
{code}

This could violates the semantics that developers had assumed when writing 
controller-broker protocol. For example, the controller code sends metadata 
updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
joined broker for the first time. 
{code}
  def onBrokerStartup(newBrokers: Seq[Int]) {
    info("New broker startup callback for %s".format(newBrokers.mkString(",")))
    val newBrokersSet = newBrokers.toSet
    // send update metadata request to all live and shutting down brokers. Old 
brokers will get to know of the new
    // broker via this update.
    // In cases of controlled shutdown leaders will not be elected when a new 
broker comes up. So at least in the
    // common controlled shutdown case, the metadata will reach the new brokers 
faster
    
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
    // supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
    val allReplicasOnNewBrokers = 
controllerContext.replicasOnBrokers(newBrokersSet)
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
OnlineReplica)
{code}
This is important because without the metadata cached in the broker the 
LeaderAndIsrRequests that ask the broker to become a follower would fail since 
there is no metadata for leader of the partition.
{code}
        metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
          // Only change partition state when the leader is available
          case Some(leaderBroker) =>
...
          case None =>
            // The leader broker should always be present in the metadata cache.
            // If not, we should record the error message and abort the 
transition process for this partition
            stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
with correlation id %d from controller" +
              " %d epoch %d for partition [%s,%d] but cannot become follower 
since the new leader %d is unavailable.")
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to