[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261182#comment-15261182 ] Kane Kim commented on KAFKA-1382: - We've been hitting this bug too on kafka 0.8.2.1: [2016-04-27 23:21:13,310] 4843558914 [kafka-scheduler-4] INFO kafka.cluster.Partition - Partition [mp-unknown,220] on broker 104224875: Shrinking ISR for partition [mp-unknown,220] from 104224875,104224876 to 104224875 [2016-04-27 23:21:13,312] 4843558916 [kafka-scheduler-4] INFO kafka.cluster.Partition - Partition [mp-unknown,220] on broker 104224875: Cached zkVersion [5] not equal to that in zookeeper, skip updating ISR Looks like we had some network problems when this happened. Network restored but broker never rejoined ISR set. > Update zkVersion on partition state update failures > --- > > Key: KAFKA-1382 > URL: https://issues.apache.org/jira/browse/KAFKA-1382 > Project: Kafka > Issue Type: Bug >Reporter: Joel Koshy >Assignee: Sriharsha Chintalapani > Fix For: 0.8.1.2, 0.8.2.0 > > Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, > KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, > KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, > KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, > KAFKA-1382_2014-06-16_14:19:27.patch > > > Our updateIsr code is currently: > private def updateIsr(newIsr: Set[Replica]) { > debug("Updated ISR for partition [%s,%d] to %s".format(topic, > partitionId, newIsr.mkString(","))) > val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, > newIsr.map(r => r.brokerId).toList, zkVersion) > // use the epoch of the controller that made the leadership decision, > instead of the current controller epoch > val (updateSucceeded, newVersion) = > ZkUtils.conditionalUpdatePersistentPath(zkClient, > ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), > ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) > if (updateSucceeded){ > inSyncReplicas = newIsr > zkVersion = newVersion > trace("ISR updated to [%s] and zkVersion updated to > [%d]".format(newIsr.mkString(","), zkVersion)) > } else { > info("Cached zkVersion [%d] not equal to that in zookeeper, skip > updating ISR".format(zkVersion)) > } > We encountered an interesting scenario recently when a large producer fully > saturated the broker's NIC for over an hour. The large volume of data led to > a number of ISR shrinks (and subsequent expands). The NIC saturation > affected the zookeeper client heartbeats and led to a session timeout. The > timeline was roughly as follows: > - Attempt to expand ISR > - Expansion written to zookeeper (confirmed in zookeeper transaction logs) > - Session timeout after around 13 seconds (the configured timeout is 20 > seconds) so that lines up. > - zkclient reconnects to zookeeper (with the same session ID) and retries > the write - but uses the old zkVersion. This fails because the zkVersion > has already been updated (above). > - The ISR expand keeps failing after that and the only way to get out of it > is to bounce the broker. > In the above code, if the zkVersion is different we should probably update > the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160030#comment-15160030 ] dude commented on KAFKA-1382: - we hit this when there is a network problem, which the kakfa broker 3 can not connect to zookeeper. After the network work normal, the broker can not update the zk and will loop the cached zk version not equal the zookeeper infinitely and the brokercan not recover until restart it. > Update zkVersion on partition state update failures > --- > > Key: KAFKA-1382 > URL: https://issues.apache.org/jira/browse/KAFKA-1382 > Project: Kafka > Issue Type: Bug >Reporter: Joel Koshy >Assignee: Sriharsha Chintalapani > Fix For: 0.8.1.2, 0.8.2.0 > > Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, > KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, > KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, > KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, > KAFKA-1382_2014-06-16_14:19:27.patch > > > Our updateIsr code is currently: > private def updateIsr(newIsr: Set[Replica]) { > debug("Updated ISR for partition [%s,%d] to %s".format(topic, > partitionId, newIsr.mkString(","))) > val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, > newIsr.map(r => r.brokerId).toList, zkVersion) > // use the epoch of the controller that made the leadership decision, > instead of the current controller epoch > val (updateSucceeded, newVersion) = > ZkUtils.conditionalUpdatePersistentPath(zkClient, > ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), > ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) > if (updateSucceeded){ > inSyncReplicas = newIsr > zkVersion = newVersion > trace("ISR updated to [%s] and zkVersion updated to > [%d]".format(newIsr.mkString(","), zkVersion)) > } else { > info("Cached zkVersion [%d] not equal to that in zookeeper, skip > updating ISR".format(zkVersion)) > } > We encountered an interesting scenario recently when a large producer fully > saturated the broker's NIC for over an hour. The large volume of data led to > a number of ISR shrinks (and subsequent expands). The NIC saturation > affected the zookeeper client heartbeats and led to a session timeout. The > timeline was roughly as follows: > - Attempt to expand ISR > - Expansion written to zookeeper (confirmed in zookeeper transaction logs) > - Session timeout after around 13 seconds (the configured timeout is 20 > seconds) so that lines up. > - zkclient reconnects to zookeeper (with the same session ID) and retries > the write - but uses the old zkVersion. This fails because the zkVersion > has already been updated (above). > - The ISR expand keeps failing after that and the only way to get out of it > is to bounce the broker. > In the above code, if the zkVersion is different we should probably update > the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158799#comment-15158799 ] Flavio Junqueira commented on KAFKA-1382: - hi there, it isn't clear to me if you've hit this bug or if your broker simply got stale versions. even with the fix proposed here, you can still get conflicting versions, which will lead to those messages. was your cluster able to recover after you've seen those messages? what's the precise behavior you have observed? > Update zkVersion on partition state update failures > --- > > Key: KAFKA-1382 > URL: https://issues.apache.org/jira/browse/KAFKA-1382 > Project: Kafka > Issue Type: Bug >Reporter: Joel Koshy >Assignee: Sriharsha Chintalapani > Fix For: 0.8.1.2, 0.8.2.0 > > Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, > KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, > KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, > KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, > KAFKA-1382_2014-06-16_14:19:27.patch > > > Our updateIsr code is currently: > private def updateIsr(newIsr: Set[Replica]) { > debug("Updated ISR for partition [%s,%d] to %s".format(topic, > partitionId, newIsr.mkString(","))) > val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, > newIsr.map(r => r.brokerId).toList, zkVersion) > // use the epoch of the controller that made the leadership decision, > instead of the current controller epoch > val (updateSucceeded, newVersion) = > ZkUtils.conditionalUpdatePersistentPath(zkClient, > ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), > ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) > if (updateSucceeded){ > inSyncReplicas = newIsr > zkVersion = newVersion > trace("ISR updated to [%s] and zkVersion updated to > [%d]".format(newIsr.mkString(","), zkVersion)) > } else { > info("Cached zkVersion [%d] not equal to that in zookeeper, skip > updating ISR".format(zkVersion)) > } > We encountered an interesting scenario recently when a large producer fully > saturated the broker's NIC for over an hour. The large volume of data led to > a number of ISR shrinks (and subsequent expands). The NIC saturation > affected the zookeeper client heartbeats and led to a session timeout. The > timeline was roughly as follows: > - Attempt to expand ISR > - Expansion written to zookeeper (confirmed in zookeeper transaction logs) > - Session timeout after around 13 seconds (the configured timeout is 20 > seconds) so that lines up. > - zkclient reconnects to zookeeper (with the same session ID) and retries > the write - but uses the old zkVersion. This fails because the zkVersion > has already been updated (above). > - The ISR expand keeps failing after that and the only way to get out of it > is to bounce the broker. > In the above code, if the zkVersion is different we should probably update > the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15089248#comment-15089248 ] dude commented on KAFKA-1382: - we hit this bug in kafka0.8.2.1, three nodes. zookeeper version is 3.4.6. the log is : [2016-01-05 08:49:27,047] INFO Partition [error-signatureId-956a8fd7-a3ec-4718-bb77-45b3a97eb0cd,0] on broker 3: Shrinking ISR for partition [error-signatureId-956a8fd 7-a3ec-4718-bb77-45b3a97eb0cd,0] from 3,1 to 3 (kafka.cluster.Partition) [2016-01-05 08:49:27,227] ERROR Uncaught exception in thread 'kafka-network-thread-39091-0': (kafka.utils.Utils$) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:374) at kafka.network.Processor.processNewResponses(SocketServer.scala:406) at kafka.network.Processor.run(SocketServer.scala:318) at java.lang.Thread.run(Thread.java:745) [2016-01-05 08:49:27,248] INFO Reconnect due to socket error: java.io.IOException: connection timeout (kafka.consumer.SimpleConsumer) [2016-01-05 08:49:27,248] INFO Reconnect due to socket error: java.io.IOException: Connection reset by peer (kafka.consumer.SimpleConsumer) [2016-01-05 08:49:27,278] ERROR Uncaught exception in thread 'kafka-network-thread-39091-1': (kafka.utils.Utils$) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:374) at kafka.network.Processor.processNewResponses(SocketServer.scala:406) at kafka.network.Processor.run(SocketServer.scala:318) at java.lang.Thread.run(Thread.java:745) [2016-01-05 08:49:27,918] INFO re-registering broker info in ZK for broker 3 (kafka.server.KafkaHealthcheck) [2016-01-05 08:49:36,312] INFO Registered broker 3 at path /brokers/ids/3 with address AI-iPaaS-ATS03:39091. (kafka.utils.ZkUtils$) [2016-01-05 08:49:36,312] INFO done re-registering broker (kafka.server.KafkaHealthcheck) [2016-01-05 08:49:36,313] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck) [2016-01-05 08:49:36,332] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2016-01-05 08:49:36,343] INFO Partition [error-signatureId-956a8fd7-a3ec-4718-bb77-45b3a97eb0cd,0] on broker 3: Cached zkVersion [0] not equal to that in zookeeper, s kip updating ISR (kafka.cluster.Partition) [2016-01-05 08:49:36,343] INFO Partition [error-signatureId-e8c1c145-4109-48d8-a46f-4eca92143594,0] on broker 3: Shrinking ISR for partition [error-signatureId-e8c1c14 5-4109-48d8-a46f-4eca92143594,0] from 3,2 to 3 (kafka.cluster.Partition) [2016-01-05 08:49:36,372] INFO Partition [error-signatureId-e8c1c145-4109-48d8-a46f-4eca92143594,0] on broker 3: Cached zkVersion [0] not equal to that in zookeeper, s kip updating ISR (kafka.cluster.Partition) [2016-01-05 08:49:36,373] INFO Partition [error-signatureId-59206ee6-e9b7-470d-9b1d-638e2cc7ebad,0] on broker 3: Shrinking ISR for partition [error-signatureId-59206ee 6-e9b7-470d-9b1d-638e2cc7ebad,0] from 3,2 to 3 (kafka.cluster.Partition) [2016-01-05 08:49:36,402] INFO Partition [error-signatureId-59206ee6-e9b7-470d-9b1d-638e2cc7ebad,0] on broker 3: Cached zkVersion [0] not equal to that in zookeeper, s kip updating ISR (kafka.cluster.Partition) [2016-01-05 08:49:36,402] INFO Partition [error-signatureId-be6798c3-57d8-4ddc-a155-04983987b160,0] on broker 3: Shrinking ISR for partition [error-signatureId-be6798c 3-57d8-4ddc-a155-04983987b160,0] from 3,1 to 3 (kafka.cluster.Partition) [2016-01-05 08:49:36,426] INFO Partition [error-signatureId-be6798c3-57d8-4ddc-a155-04983987b160,0] on broker 3: Cached zkVersion [0] not equal to that in zookeeper, s kip updating ISR (kafka.cluster.Partition) [2016-01-05 08:49:36,426] INFO Partition [error-signatureId-38fd31e8-3a0a-4b06-b278-a8f10bab232f,0] on broker 3: Shrinking ISR for partition [error-signatureId-38fd31e > Update zkVersion on partition state update failures > --- > > Key: KAFKA-1382 > URL: https://issues.apache.org/jira/browse/KAFKA-1382 > Project: Kafka > Issue Type: Bug >Reporter: Joel Koshy >Assignee: Sriharsha Chintalapani > Fix For: 0.8.1.2, 0.8.2.0 > >
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14138191#comment-14138191 ] Noah Yetter commented on KAFKA-1382: We put a decent amount of effort at replicating this scenario and were unable to. It has only happened to us in the wild, and never for any apparent reason. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14134070#comment-14134070 ] Sriharsha Chintalapani commented on KAFKA-1382: --- [~jhooda] Thanks for the info. I tried to reproduce this by running a 3 node cluster with 3 zookeeper. Ran simple tests like creating a topic, altering the topic config , producing data into the topic and reading off the topic, shutting down one of the brokers and also kill -9 a broker. None of the above cases produced the error. Could you please provide the logs and also when did this error happened. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14134726#comment-14134726 ] Jagbir commented on KAFKA-1382: --- Hi Sriharsha, Thanks for looking into the issue. We are trying to consistently replicate the behavior and will get back. Jagbir Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14133349#comment-14133349 ] Jagbir commented on KAFKA-1382: --- Not sure if this is related. We applied the final patch on 0.8.1.1 and noticed a null pointer that we haven't noticed earlier --8 [2014-09-13 15:26:36,254] ERROR Error processing config change: (kafka.server.TopicConfigManager) java.lang.NullPointerException at scala.collection.convert.Wrappers$JListWrapper.length(Wrappers.scala:85) at scala.collection.SeqLike$class.size(SeqLike.scala:106) at scala.collection.AbstractSeq.size(Seq.scala:40) at kafka.server.TopicConfigManager.kafka$server$TopicConfigManager$$processConfigChanges(TopicConfigManager.scala:89) at kafka.server.TopicConfigManager$ConfigChangeListener$.handleChildChange(TopicConfigManager.scala:144) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:570) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-09-13 15:26:36,273] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) --8 Our configuration is 3 brokers 3 zookeepers and topic replication set to 3. Thanks, jagbir Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14133383#comment-14133383 ] Sriharsha Chintalapani commented on KAFKA-1382: --- [~jhooda] can you share how did you apply this patch against 0.8.1.1. I tried doing the following 0.8.1 branch git apply --check ../KAFKA-1382_2014-06-16_14:19:27.patch I get error: patch failed: core/src/main/scala/kafka/cluster/Partition.scala:18 error: core/src/main/scala/kafka/cluster/Partition.scala: patch does not apply I tried it on http://kafka.apache.org/downloads.html kafka-0.8.1.1-src.tgz it doesn't apply cleanly. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14133446#comment-14133446 ] Jagbir commented on KAFKA-1382: --- Hi Sriharsha, This is what I did prompt git clone http://git-wip-us.apache.org/repos/asf/kafka.git prompt git checkout refs/tags/0.8.1.1 -b kafka-1382 prompt wget https://issues.apache.org/jira/secure/attachment/12646741/KAFKA-1382.patch prompt patch -p1 KAFKA-1382.patch Thanks, Jagbir Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14131013#comment-14131013 ] Jagbir commented on KAFKA-1382: --- We are in same fix. Can you please comment if this can be patched safely on 0.8.1.1? Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14120028#comment-14120028 ] Noah Yetter commented on KAFKA-1382: Is it possible to apply this patch to 0.8.1.1? Or better yet, can we get a 0.8.1.2 release with this patch included? We have experienced this bug multiple times in production, causing data loss. We had been taking the approach of waiting for 0.8.2 and crossing our fingers, but that release no longer has even a target release date on the roadmap. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036082#comment-14036082 ] Sriharsha Chintalapani commented on KAFKA-1382: --- [~junrao] Can you please take a look at the latest patch and let me know if it looks good or not. Thanks, Harsha Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036932#comment-14036932 ] Neha Narkhede commented on KAFKA-1382: -- Thanks for the follow up patch. Pushed to trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036964#comment-14036964 ] Jun Rao commented on KAFKA-1382: Thanks for the patch. This looks good to me. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14032927#comment-14032927 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Updated reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14032963#comment-14032963 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Updated reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, KAFKA-1382_2014-06-16_14:19:27.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027984#comment-14027984 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Updated reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, KAFKA-1382_2014-06-11_09:37:22.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027099#comment-14027099 ] Neha Narkhede commented on KAFKA-1382: -- Oops, I'm sorry, I didn't see Jun's latest comments before committing. I guess we need a follow up patch with Jun's suggestions addressed. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027110#comment-14027110 ] Sriharsha Chintalapani commented on KAFKA-1382: --- [~nehanarkhede] I am working on patch based on Jun's comments. Will send updated patch. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020857#comment-14020857 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Updated reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, KAFKA-1382_2014-06-07_09:00:56.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14014854#comment-14014854 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Updated reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, KAFKA-1382_2014-05-31_15:50:25.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14014511#comment-14014511 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Updated reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Fix For: 0.8.2 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14008575#comment-14008575 ] Sriharsha Chintalapani commented on KAFKA-1382: --- Created reviewboard https://reviews.apache.org/r/21899/diff/ against branch origin/trunk Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.2 Attachments: KAFKA-1382.patch Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13984416#comment-13984416 ] James Blackburn commented on KAFKA-1382: The other thing about this is that it can quickly churn through log. There are log lines every few milliseconds at INFO level leading to GBs of log in a very short time, e.g.: {code} [2014-04-28 00:01:37,010] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached zkVersion [21] not equal to that in zookeeper, ski p updating ISR (kafka.cluster.Partition) [2014-04-28 00:01:37,017] INFO Partition [RSF_OPTIONS,10] on broker 1: Expanding ISR for partition [RSF_OPTIONS,10] from 1 to 1, 0 (kafka.cluster.Partition) [2014-04-28 00:01:37,019] ERROR Conditional update of path /brokers/topics/RSF_OPTIONS/partitions/10/state with data {controlle r_epoch:19,leader:1,version:1,leader_epoch:6,isr:[1,0]} and expected version 21 failed due to org.apache.zookeeper.Keep erException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/RSF_OPTIONS/partitions/10/state (kafka.utils.Z kUtils$) [2014-04-28 00:01:37,019] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached zkVersion [21] not equal to that in zookeeper, ski p updating ISR (kafka.cluster.Partition) [2014-04-28 00:01:37,019] INFO Partition [RSF,14] on broker 1: Expanding ISR for partition [RSF,14] from 1 to 1,0 (kafka.cluster .Partition) [2014-04-28 00:01:37,020] ERROR Conditional update of path /brokers/topics/RSF/partitions/14/state with data {controller_epoch :19,leader:1,version:1,leader_epoch:6,isr:[1,0]} and expected version 21 failed due to org.apache.zookeeper.KeeperExcept ion$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/RSF/partitions/14/state (kafka.utils.ZkUtils$) [2014-04-28 00:01:37,020] INFO Partition [RSF,14] on broker 1: Cached zkVersion [21] not equal to that in zookeeper, skip updati ng ISR (kafka.cluster.Partition) [2014-04-28 00:01:37,035] INFO Partition [RSF_OPTIONS,10] on broker 1: Expanding ISR for partition [RSF_OPTIONS,10] from 1 to 1, 0 (kafka.cluster.Partition) [2014-04-28 00:01:37,037] ERROR Conditional update of path /brokers/topics/RSF_OPTIONS/partitions/10/state with data {controlle r_epoch:19,leader:1,version:1,leader_epoch:6,isr:[1,0]} and expected version 21 failed due to org.apache.zookeeper.Keep erException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/RSF_OPTIONS/partitions/10/state (kafka.utils.Z kUtils$) [2014-04-28 00:01:37,037] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached zkVersion [21] not equal to that in zookeeper, ski p updating ISR (kafka.cluster.Partition) [2014-04-28 00:01:37,037] INFO Partition [RSF,14] on broker 1: Expanding ISR for partition [RSF,14] from 1 to 1,0 (kafka.cluster .Partition) {code} Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.2 Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13966743#comment-13966743 ] Jun Rao commented on KAFKA-1382: Joel, Yes, that's a good point. When the leader tries to update the isr and sees a zk version conflict, we could read the data back and make sure the content is the same, including the leader epoch. If so, we can assume the previous update actually succeeded. This should be easy to patch. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965431#comment-13965431 ] Jun Rao commented on KAFKA-1382: It sounds like we get a ZK connectionLossException, instead of a session expiration. If it's the latter, the session ID would be different. ConnectionLossException is handled in zkclient by simply retrying the operation. This doesn't quite work for conditional updates since if the previous operation succeeded, the underlying ZK version would have changed. We can't just blindly update the cached zkVersion either since the zkVersion could be different because the controller has changed the zk path at the same time, which takes precedence. Not sure what's the best way to fix this. One way is to add some metadata in the zk value to indicate the id of the writer. Then, we can use that information to verify if the last (potential conflicting) update is made by the caller itself or not. We will have to add such metadata in a backward compatible way. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965445#comment-13965445 ] Guozhang Wang commented on KAFKA-1382: -- Could we patch controller such that when such an ISR write has failed, read the stored value, and if it is the same as the value trying to write, let it pass just like the broker registration function? Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965456#comment-13965456 ] Jun Rao commented on KAFKA-1382: Controller's update to isr will always succeed. It keeps re-reading the current value in ZK and then does the conditional update. The leader's update to isr has to fail if the previous update was made by the controller. Checking the value itself will work for most cases, but may not cover 100%. It can happen that the controller updated isr to the same value that the leader wants to write. In this case, we still want to fail the update made by the leader. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965458#comment-13965458 ] Guozhang Wang commented on KAFKA-1382: -- Yeah I meant to say broker in the last comment. Why in this corner case we still want to fail the update instead of just letting it pass? Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965567#comment-13965567 ] Jun Rao commented on KAFKA-1382: Basically once a controller has updated isr, the leader can't touch it until it has received the latest info from the controller. Say a leader had a zk version conflict when updating isr and it sees the latest value (written by the controller) is the same as the value it's trying the write, and we allow it. What could happen is that the leader could successfully shrink the isr again before the controller's decision is propagated to all brokers. Once the leader shrinks the isr, it can commit new messages, which may not be in the new leader that the controller selected. We have lost committed data at this point. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965706#comment-13965706 ] Joel Koshy commented on KAFKA-1382: --- We do have the leader epoch maintained for each partition - so we won't blindly update the zkVersion but only update it if the leaderEpoch stored in zookeeper is what we currently know. Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965027#comment-13965027 ] Guozhang Wang commented on KAFKA-1382: -- When the session timeout happens, would the controller's session timeout listener be fired? If yes, would we just increment the zkVersion in the callback function? Update zkVersion on partition state update failures --- Key: KAFKA-1382 URL: https://issues.apache.org/jira/browse/KAFKA-1382 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Our updateIsr code is currently: private def updateIsr(newIsr: Set[Replica]) { debug(Updated ISR for partition [%s,%d] to %s.format(topic, partitionId, newIsr.mkString(,))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r = r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion trace(ISR updated to [%s] and zkVersion updated to [%d].format(newIsr.mkString(,), zkVersion)) } else { info(Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR.format(zkVersion)) } We encountered an interesting scenario recently when a large producer fully saturated the broker's NIC for over an hour. The large volume of data led to a number of ISR shrinks (and subsequent expands). The NIC saturation affected the zookeeper client heartbeats and led to a session timeout. The timeline was roughly as follows: - Attempt to expand ISR - Expansion written to zookeeper (confirmed in zookeeper transaction logs) - Session timeout after around 13 seconds (the configured timeout is 20 seconds) so that lines up. - zkclient reconnects to zookeeper (with the same session ID) and retries the write - but uses the old zkVersion. This fails because the zkVersion has already been updated (above). - The ISR expand keeps failing after that and the only way to get out of it is to bounce the broker. In the above code, if the zkVersion is different we should probably update the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.2#6252)