[jira] [Updated] (KAFKA-3963) Missing messages from the controller to brokers

2016-11-26 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3963:
-
Fix Version/s: (was: 0.10.1.0)

> Missing messages from the controller to brokers
> ---
>
> Key: KAFKA-3963
> URL: https://issues.apache.org/jira/browse/KAFKA-3963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
>
> The controller takes messages from a queue and send it to the designated 
> broker. If the controller times out on receiving a response from the broker 
> (30s) it closes the connection and retries again after a backoff period, 
> however it does not return the message back to the queue. As a result the 
> retry will start with the next message and the previous message might have 
> never been received by the broker.
> {code}
> val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>   try {
> ...
>   clientResponse = 
> networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
> }
>   } catch {
> case e: Throwable => // if the send was not successful, reconnect 
> to broker and resend the message
>   warn(("Controller %d epoch %d fails to send request %s to 
> broker %s. " +
> "Reconnecting to broker.").format(controllerId, 
> controllerContext.epoch,
>   request.toString, brokerNode.toString()), e)
>   networkClient.close(brokerNode.idString)
> ...
>   }
> {code}
> This could violates the semantics that developers had assumed when writing 
> controller-broker protocol. For example, the controller code sends metadata 
> updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
> joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new 
> brokers faster
> 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
> val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the 
> LeaderAndIsrRequests that ask the broker to become a follower would fail 
> since there is no metadata for leader of the partition.
> {code}
> metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
>   // Only change partition state when the leader is available
>   case Some(leaderBroker) =>
> ...
>   case None =>
> // The leader broker should always be present in the metadata 
> cache.
> // If not, we should record the error message and abort the 
> transition process for this partition
> stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
> with correlation id %d from controller" +
>   " %d epoch %d for partition [%s,%d] but cannot become follower 
> since the new leader %d is unavailable.")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3963) Missing messages from the controller to brokers

2016-07-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3963:
---
Fix Version/s: 0.10.1.0

> Missing messages from the controller to brokers
> ---
>
> Key: KAFKA-3963
> URL: https://issues.apache.org/jira/browse/KAFKA-3963
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> The controller takes messages from a queue and send it to the designated 
> broker. If the controller times out on receiving a response from the broker 
> (30s) it closes the connection and retries again after a backoff period, 
> however it does not return the message back to the queue. As a result the 
> retry will start with the next message and the previous message might have 
> never been received by the broker.
> {code}
> val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>   try {
> ...
>   clientResponse = 
> networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
> }
>   } catch {
> case e: Throwable => // if the send was not successful, reconnect 
> to broker and resend the message
>   warn(("Controller %d epoch %d fails to send request %s to 
> broker %s. " +
> "Reconnecting to broker.").format(controllerId, 
> controllerContext.epoch,
>   request.toString, brokerNode.toString()), e)
>   networkClient.close(brokerNode.idString)
> ...
>   }
> {code}
> This could violates the semantics that developers had assumed when writing 
> controller-broker protocol. For example, the controller code sends metadata 
> updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly 
> joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
> info("New broker startup callback for 
> %s".format(newBrokers.mkString(",")))
> val newBrokersSet = newBrokers.toSet
> // send update metadata request to all live and shutting down brokers. 
> Old brokers will get to know of the new
> // broker via this update.
> // In cases of controlled shutdown leaders will not be elected when a new 
> broker comes up. So at least in the
> // common controlled shutdown case, the metadata will reach the new 
> brokers faster
> 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
> // the very first thing to do when a new broker comes up is send it the 
> entire list of partitions that it is
> // supposed to host. Based on that the broker starts the high watermark 
> threads for the input list of partitions
> val allReplicasOnNewBrokers = 
> controllerContext.replicasOnBrokers(newBrokersSet)
> replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
> OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the 
> LeaderAndIsrRequests that ask the broker to become a follower would fail 
> since there is no metadata for leader of the partition.
> {code}
> metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
>   // Only change partition state when the leader is available
>   case Some(leaderBroker) =>
> ...
>   case None =>
> // The leader broker should always be present in the metadata 
> cache.
> // If not, we should record the error message and abort the 
> transition process for this partition
> stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
> with correlation id %d from controller" +
>   " %d epoch %d for partition [%s,%d] but cannot become follower 
> since the new leader %d is unavailable.")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)