[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2016-04-27 Thread Kane Kim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261182#comment-15261182
 ] 

Kane Kim commented on KAFKA-1382:
-

We've been hitting this bug too on kafka 0.8.2.1:

[2016-04-27 23:21:13,310]  4843558914 [kafka-scheduler-4] INFO  
kafka.cluster.Partition  - Partition [mp-unknown,220] on broker 104224875: 
Shrinking ISR for partition [mp-unknown,220] from 104224875,104224876 to 
104224875
[2016-04-27 23:21:13,312]  4843558916 [kafka-scheduler-4] INFO  
kafka.cluster.Partition  - Partition [mp-unknown,220] on broker 104224875: 
Cached zkVersion [5] not equal to that in zookeeper, skip updating ISR

Looks like we had some network problems when this happened. Network restored 
but broker never rejoined ISR set.

> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.1.2, 0.8.2.0
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
> KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
> KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
> KAFKA-1382_2014-06-16_14:19:27.patch
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
> val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
>   inSyncReplicas = newIsr
>   zkVersion = newVersion
>   trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
>   info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2016-02-23 Thread dude (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160030#comment-15160030
 ] 

dude commented on KAFKA-1382:
-

we hit this when there is a network problem, which the kakfa broker 3 can not 
connect to zookeeper.  After the network work normal, the broker can not update 
the zk and will loop the cached zk version not equal the zookeeper infinitely 
and the brokercan not recover until restart it.

> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.1.2, 0.8.2.0
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
> KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
> KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
> KAFKA-1382_2014-06-16_14:19:27.patch
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
> val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
>   inSyncReplicas = newIsr
>   zkVersion = newVersion
>   trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
>   info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2016-02-23 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158799#comment-15158799
 ] 

Flavio Junqueira commented on KAFKA-1382:
-

hi there, it isn't clear to me if you've hit this bug or if your broker simply 
got stale versions. even with the fix proposed here, you can still get 
conflicting versions, which will lead to those messages. was your cluster able 
to recover after you've seen those messages? what's the precise behavior you 
have observed?

> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.1.2, 0.8.2.0
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
> KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
> KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
> KAFKA-1382_2014-06-16_14:19:27.patch
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
> val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
>   inSyncReplicas = newIsr
>   zkVersion = newVersion
>   trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
>   info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2016-01-08 Thread dude (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15089248#comment-15089248
 ] 

dude commented on KAFKA-1382:
-

we hit this bug in kafka0.8.2.1, three nodes. zookeeper version is 3.4.6. the 
log is :


[2016-01-05 08:49:27,047] INFO Partition 
[error-signatureId-956a8fd7-a3ec-4718-bb77-45b3a97eb0cd,0] on broker 3: 
Shrinking ISR for partition [error-signatureId-956a8fd
7-a3ec-4718-bb77-45b3a97eb0cd,0] from 3,1 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:27,227] ERROR Uncaught exception in thread 
'kafka-network-thread-39091-0': (kafka.utils.Utils$)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:374)
at kafka.network.Processor.processNewResponses(SocketServer.scala:406)
at kafka.network.Processor.run(SocketServer.scala:318)
at java.lang.Thread.run(Thread.java:745)
[2016-01-05 08:49:27,248] INFO Reconnect due to socket error: 
java.io.IOException: connection timeout (kafka.consumer.SimpleConsumer)
[2016-01-05 08:49:27,248] INFO Reconnect due to socket error: 
java.io.IOException: Connection reset by peer (kafka.consumer.SimpleConsumer)
[2016-01-05 08:49:27,278] ERROR Uncaught exception in thread 
'kafka-network-thread-39091-1': (kafka.utils.Utils$)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:524)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:374)
at kafka.network.Processor.processNewResponses(SocketServer.scala:406)
at kafka.network.Processor.run(SocketServer.scala:318)
at java.lang.Thread.run(Thread.java:745)
[2016-01-05 08:49:27,918] INFO re-registering broker info in ZK for broker 3 
(kafka.server.KafkaHealthcheck)
[2016-01-05 08:49:36,312] INFO Registered broker 3 at path /brokers/ids/3 with 
address AI-iPaaS-ATS03:39091. (kafka.utils.ZkUtils$)
[2016-01-05 08:49:36,312] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck)
[2016-01-05 08:49:36,313] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck)
[2016-01-05 08:49:36,332] INFO New leader is 1 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-01-05 08:49:36,343] INFO Partition 
[error-signatureId-956a8fd7-a3ec-4718-bb77-45b3a97eb0cd,0] on broker 3: Cached 
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,343] INFO Partition 
[error-signatureId-e8c1c145-4109-48d8-a46f-4eca92143594,0] on broker 3: 
Shrinking ISR for partition [error-signatureId-e8c1c14
5-4109-48d8-a46f-4eca92143594,0] from 3,2 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:36,372] INFO Partition 
[error-signatureId-e8c1c145-4109-48d8-a46f-4eca92143594,0] on broker 3: Cached 
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,373] INFO Partition 
[error-signatureId-59206ee6-e9b7-470d-9b1d-638e2cc7ebad,0] on broker 3: 
Shrinking ISR for partition [error-signatureId-59206ee
6-e9b7-470d-9b1d-638e2cc7ebad,0] from 3,2 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:36,402] INFO Partition 
[error-signatureId-59206ee6-e9b7-470d-9b1d-638e2cc7ebad,0] on broker 3: Cached 
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,402] INFO Partition 
[error-signatureId-be6798c3-57d8-4ddc-a155-04983987b160,0] on broker 3: 
Shrinking ISR for partition [error-signatureId-be6798c
3-57d8-4ddc-a155-04983987b160,0] from 3,1 to 3 (kafka.cluster.Partition)
[2016-01-05 08:49:36,426] INFO Partition 
[error-signatureId-be6798c3-57d8-4ddc-a155-04983987b160,0] on broker 3: Cached 
zkVersion [0] not equal to that in zookeeper, s
kip updating ISR (kafka.cluster.Partition)
[2016-01-05 08:49:36,426] INFO Partition 
[error-signatureId-38fd31e8-3a0a-4b06-b278-a8f10bab232f,0] on broker 3: 
Shrinking ISR for partition [error-signatureId-38fd31e


> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.1.2, 0.8.2.0
>
>  

[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-17 Thread Noah Yetter (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14138191#comment-14138191
 ] 

Noah Yetter commented on KAFKA-1382:


We put a decent amount of effort at replicating this scenario and were unable 
to.  It has only happened to us in the wild, and never for any apparent reason.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-15 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14134070#comment-14134070
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

[~jhooda]
Thanks for the info. I tried to reproduce this by running a 3 node cluster with 
3 zookeeper.
Ran simple tests like creating a topic, altering the topic config , producing 
data into the topic and reading off the topic, shutting down one of the brokers 
and also kill -9 a broker. None of the above cases produced the error. Could 
you please provide the logs and also when did this error happened.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-15 Thread Jagbir (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14134726#comment-14134726
 ] 

Jagbir commented on KAFKA-1382:
---

Hi Sriharsha,

Thanks for looking into the issue. We are trying to consistently replicate the 
behavior and will get back.

Jagbir

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-14 Thread Jagbir (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14133349#comment-14133349
 ] 

Jagbir commented on KAFKA-1382:
---

Not sure if this is related. We applied the final patch on 0.8.1.1 and noticed 
a null pointer that we haven't noticed earlier

--8
[2014-09-13 15:26:36,254] ERROR Error processing config change: 
(kafka.server.TopicConfigManager)
java.lang.NullPointerException
at 
scala.collection.convert.Wrappers$JListWrapper.length(Wrappers.scala:85)
at scala.collection.SeqLike$class.size(SeqLike.scala:106)
at scala.collection.AbstractSeq.size(Seq.scala:40)
at 
kafka.server.TopicConfigManager.kafka$server$TopicConfigManager$$processConfigChanges(TopicConfigManager.scala:89)
at 
kafka.server.TopicConfigManager$ConfigChangeListener$.handleChildChange(TopicConfigManager.scala:144)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:570)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2014-09-13 15:26:36,273] INFO New leader is 1 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
--8

Our configuration is 3 brokers 3 zookeepers and topic replication set to 3.

Thanks,
jagbir

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-14 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14133383#comment-14133383
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

[~jhooda]
 can you share how did you apply this patch against 0.8.1.1. I tried doing the 
following 0.8.1 branch
git apply --check  ../KAFKA-1382_2014-06-16_14:19:27.patch 
I get 
error: patch failed: core/src/main/scala/kafka/cluster/Partition.scala:18
error: core/src/main/scala/kafka/cluster/Partition.scala: patch does not apply

I tried it on http://kafka.apache.org/downloads.html kafka-0.8.1.1-src.tgz it 
doesn't apply cleanly.


 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-14 Thread Jagbir (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14133446#comment-14133446
 ] 

Jagbir commented on KAFKA-1382:
---

Hi Sriharsha,

This is what I did

prompt git clone http://git-wip-us.apache.org/repos/asf/kafka.git
prompt git checkout refs/tags/0.8.1.1 -b kafka-1382
prompt wget 
https://issues.apache.org/jira/secure/attachment/12646741/KAFKA-1382.patch
prompt patch -p1  KAFKA-1382.patch

Thanks,
Jagbir

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-11 Thread Jagbir (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14131013#comment-14131013
 ] 

Jagbir commented on KAFKA-1382:
---

We are in same fix. Can you please comment if this can be patched safely on 
0.8.1.1?

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-09-03 Thread Noah Yetter (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14120028#comment-14120028
 ] 

Noah Yetter commented on KAFKA-1382:


Is it possible to apply this patch to 0.8.1.1?  Or better yet, can we get a 
0.8.1.2 release with this patch included?

We have experienced this bug multiple times in production, causing data loss.  
We had been taking the approach of waiting for 0.8.2 and crossing our fingers, 
but that release no longer has even a target release date on the roadmap.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-18 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036082#comment-14036082
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

[~junrao] Can you please take a look at the latest patch and let me know if it 
looks good or not.
Thanks,
Harsha

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-18 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036932#comment-14036932
 ] 

Neha Narkhede commented on KAFKA-1382:
--

Thanks for the follow up patch. Pushed to trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-18 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14036964#comment-14036964
 ] 

Jun Rao commented on KAFKA-1382:


Thanks for the patch. This looks good to me.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-16 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14032927#comment-14032927
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-16 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14032963#comment-14032963
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
 KAFKA-1382_2014-06-16_14:19:27.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-11 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027984#comment-14027984
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
 KAFKA-1382_2014-06-11_09:37:22.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-10 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027099#comment-14027099
 ] 

Neha Narkhede commented on KAFKA-1382:
--

Oops, I'm sorry, I didn't see Jun's latest comments before committing. I guess 
we need a follow up patch with Jun's suggestions addressed.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-10 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027110#comment-14027110
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

[~nehanarkhede] I am working on patch based on Jun's comments. Will send 
updated patch.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-07 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020857#comment-14020857
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
 KAFKA-1382_2014-06-07_09:00:56.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-05-31 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14014854#comment-14014854
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
 KAFKA-1382_2014-05-31_15:50:25.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-05-30 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14014511#comment-14014511
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-05-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14008575#comment-14008575
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Created reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.8.2

 Attachments: KAFKA-1382.patch


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-29 Thread James Blackburn (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13984416#comment-13984416
 ] 

James Blackburn commented on KAFKA-1382:


The other thing about this is that it can quickly churn through log.  There are 
log lines every few milliseconds at INFO level leading to  GBs of log in a very 
short time, e.g.:

{code}
[2014-04-28 00:01:37,010] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached 
zkVersion [21] not equal to that in zookeeper, ski
p updating ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,017] INFO Partition [RSF_OPTIONS,10] on broker 1: 
Expanding ISR for partition [RSF_OPTIONS,10] from 1 to 1,
0 (kafka.cluster.Partition)
[2014-04-28 00:01:37,019] ERROR Conditional update of path 
/brokers/topics/RSF_OPTIONS/partitions/10/state with data {controlle
r_epoch:19,leader:1,version:1,leader_epoch:6,isr:[1,0]} and expected 
version 21 failed due to org.apache.zookeeper.Keep
erException$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/RSF_OPTIONS/partitions/10/state (kafka.utils.Z
kUtils$)
[2014-04-28 00:01:37,019] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached 
zkVersion [21] not equal to that in zookeeper, ski
p updating ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,019] INFO Partition [RSF,14] on broker 1: Expanding ISR 
for partition [RSF,14] from 1 to 1,0 (kafka.cluster
.Partition)
[2014-04-28 00:01:37,020] ERROR Conditional update of path 
/brokers/topics/RSF/partitions/14/state with data {controller_epoch
:19,leader:1,version:1,leader_epoch:6,isr:[1,0]} and expected version 
21 failed due to org.apache.zookeeper.KeeperExcept
ion$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/RSF/partitions/14/state (kafka.utils.ZkUtils$)
[2014-04-28 00:01:37,020] INFO Partition [RSF,14] on broker 1: Cached zkVersion 
[21] not equal to that in zookeeper, skip updati
ng ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,035] INFO Partition [RSF_OPTIONS,10] on broker 1: 
Expanding ISR for partition [RSF_OPTIONS,10] from 1 to 1,
0 (kafka.cluster.Partition)
[2014-04-28 00:01:37,037] ERROR Conditional update of path 
/brokers/topics/RSF_OPTIONS/partitions/10/state with data {controlle
r_epoch:19,leader:1,version:1,leader_epoch:6,isr:[1,0]} and expected 
version 21 failed due to org.apache.zookeeper.Keep
erException$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/RSF_OPTIONS/partitions/10/state (kafka.utils.Z
kUtils$)
[2014-04-28 00:01:37,037] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached 
zkVersion [21] not equal to that in zookeeper, ski
p updating ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,037] INFO Partition [RSF,14] on broker 1: Expanding ISR 
for partition [RSF,14] from 1 to 1,0 (kafka.cluster
.Partition)
{code}

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.8.2


 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is 

[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-11 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13966743#comment-13966743
 ] 

Jun Rao commented on KAFKA-1382:


Joel,

Yes, that's a good point. When the leader tries to update the isr and sees a zk 
version conflict, we could read the data back and make sure the content is the 
same, including the leader epoch. If so, we can assume the previous update 
actually succeeded. This should be easy to patch.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965431#comment-13965431
 ] 

Jun Rao commented on KAFKA-1382:


It sounds like we get a ZK connectionLossException, instead of a session 
expiration. If it's the latter, the session ID would be different. 
ConnectionLossException is handled in zkclient by simply retrying the 
operation. This doesn't quite work for conditional updates since if the 
previous operation succeeded, the underlying ZK version would have changed. We 
can't just blindly update the cached zkVersion either since the zkVersion could 
be different because the controller has changed the zk path at the same time, 
which takes precedence.

Not sure what's the best way to fix this. One way is to add some metadata in 
the zk value to indicate the id of the writer. Then, we can use that 
information to verify if the last (potential conflicting) update is made by the 
caller itself or not. We will have to add such metadata in a backward 
compatible way.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965445#comment-13965445
 ] 

Guozhang Wang commented on KAFKA-1382:
--

Could we patch controller such that when such an ISR write has failed, read the 
stored value, and if it is the same as the value trying to write, let it pass 
just like the broker registration function?

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965456#comment-13965456
 ] 

Jun Rao commented on KAFKA-1382:


Controller's update to isr will always succeed. It keeps re-reading the current 
value in ZK and then does the conditional update. The leader's update to isr 
has to fail if the previous update was made by the controller. Checking the 
value itself will work for most cases, but may not cover 100%. It can happen 
that the controller updated isr to the same value that the leader wants to 
write. In this case, we still want to fail the update made by the leader.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965458#comment-13965458
 ] 

Guozhang Wang commented on KAFKA-1382:
--

Yeah I meant to say broker in the last comment.

Why in this corner case we still want to fail the update instead of just 
letting it pass?

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965567#comment-13965567
 ] 

Jun Rao commented on KAFKA-1382:


Basically once a controller has updated isr, the leader can't touch it until it 
has received the latest info from the controller. Say a leader had a zk version 
conflict when updating isr and it sees the latest value (written by the 
controller) is the same as the value it's trying the write, and we allow it. 
What could happen is that the leader could successfully shrink the isr again 
before the controller's decision is propagated to all brokers. Once the leader 
shrinks the isr, it can commit new messages, which may not be in the new leader 
that the controller selected. We have lost committed data at this point.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965706#comment-13965706
 ] 

Joel Koshy commented on KAFKA-1382:
---

We do have the leader epoch maintained for each partition - so we won't blindly 
update the zkVersion but only update it if the leaderEpoch stored in zookeeper 
is what we currently know.

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-04-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13965027#comment-13965027
 ] 

Guozhang Wang commented on KAFKA-1382:
--

When the session timeout happens, would the controller's session timeout 
listener be fired? If yes, would we just increment the zkVersion in the 
callback function?

 Update zkVersion on partition state update failures
 ---

 Key: KAFKA-1382
 URL: https://issues.apache.org/jira/browse/KAFKA-1382
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy

 Our updateIsr code is currently:
   private def updateIsr(newIsr: Set[Replica]) {
 debug(Updated ISR for partition [%s,%d] to %s.format(topic, 
 partitionId, newIsr.mkString(,)))
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
 newIsr.map(r = r.brokerId).toList, zkVersion)
 // use the epoch of the controller that made the leadership decision, 
 instead of the current controller epoch
 val (updateSucceeded, newVersion) = 
 ZkUtils.conditionalUpdatePersistentPath(zkClient,
   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
 if (updateSucceeded){
   inSyncReplicas = newIsr
   zkVersion = newVersion
   trace(ISR updated to [%s] and zkVersion updated to 
 [%d].format(newIsr.mkString(,), zkVersion))
 } else {
   info(Cached zkVersion [%d] not equal to that in zookeeper, skip 
 updating ISR.format(zkVersion))
 }
 We encountered an interesting scenario recently when a large producer fully
 saturated the broker's NIC for over an hour. The large volume of data led to
 a number of ISR shrinks (and subsequent expands). The NIC saturation
 affected the zookeeper client heartbeats and led to a session timeout. The
 timeline was roughly as follows:
 - Attempt to expand ISR
 - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
 - Session timeout after around 13 seconds (the configured timeout is 20
   seconds) so that lines up.
 - zkclient reconnects to zookeeper (with the same session ID) and retries
   the write - but uses the old zkVersion. This fails because the zkVersion
   has already been updated (above).
 - The ISR expand keeps failing after that and the only way to get out of it
   is to bounce the broker.
 In the above code, if the zkVersion is different we should probably update
 the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)