[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037210#comment-15037210
 ] 

ASF GitHub Bot commented on KAFKA-2308:
---

Github user darionyaphet commented on the pull request:

https://github.com/apache/storm/pull/801#issuecomment-161509467
  
Hi @knusbaum @revans2 I read `Kafka Release Notes Version 0.8.2.2`  and 
found a bug fixed 
([KAFKA-2308](https://issues.apache.org/jira/browse/KAFKA-2308)) about New 
producer and Snappy un-compression errors when Kafka Broker restart . So I 
think this is maybe useful .


> New producer + Snappy face un-compression errors after broker restart
> -
>
> Key: KAFKA-2308
> URL: https://issues.apache.org/jira/browse/KAFKA-2308
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.9.0.0, 0.8.2.2
>
> Attachments: KAFKA-2308.patch
>
>
> Looks like the new producer, when used with Snappy, following a broker 
> restart is sending messages the brokers can't decompress. This issue was 
> discussed at few mailing lists thread, but I don't think we ever resolved it.
> I can reproduce with trunk and Snappy 1.1.1.7. 
> To reproduce:
> 1. Start 3 brokers
> 2. Create a topic with 3 partitions and 3 replicas each.
> 2. Start performance producer with --new-producer --compression-codec 2 (and 
> set the number of messages to fairly high, to give you time. I went with 10M)
> 3. Bounce one of the brokers
> 4. The log of one of the surviving nodes should contain errors like:
> {code}
> 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
> on Broker 66]: Error processing append operation on partition [t3,0]
> kafka.common.KafkaException:
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
> at kafka.log.Log.liftedTree1$1(Log.scala:327)
> at kafka.log.Log.append(Log.scala:326)
> at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
> at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-12-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034497#comment-15034497
 ] 

Guozhang Wang commented on KAFKA-2308:
--

[~allenxwang] This bug should be only in the producer due to its use patterns 
of snappy.

> New producer + Snappy face un-compression errors after broker restart
> -
>
> Key: KAFKA-2308
> URL: https://issues.apache.org/jira/browse/KAFKA-2308
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.9.0.0, 0.8.2.2
>
> Attachments: KAFKA-2308.patch
>
>
> Looks like the new producer, when used with Snappy, following a broker 
> restart is sending messages the brokers can't decompress. This issue was 
> discussed at few mailing lists thread, but I don't think we ever resolved it.
> I can reproduce with trunk and Snappy 1.1.1.7. 
> To reproduce:
> 1. Start 3 brokers
> 2. Create a topic with 3 partitions and 3 replicas each.
> 2. Start performance producer with --new-producer --compression-codec 2 (and 
> set the number of messages to fairly high, to give you time. I went with 10M)
> 3. Bounce one of the brokers
> 4. The log of one of the surviving nodes should contain errors like:
> {code}
> 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
> on Broker 66]: Error processing append operation on partition [t3,0]
> kafka.common.KafkaException:
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
> at kafka.log.Log.liftedTree1$1(Log.scala:327)
> at kafka.log.Log.append(Log.scala:326)
> at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
> at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
> at 
> 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-12-01 Thread Allen Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034439#comment-15034439
 ] 

Allen Wang commented on KAFKA-2308:
---

[~gwenshap] [~guozhang] Is the fix in producer only? If I take 0.8.2.2 
producer, do I also need to have broker/consumer upgraded to 0.8.2.2 or a later 
snappy version in order to avoid this bug? Currently our broker is on 0.8.2.1 
and snappy 1.1.1.6. 

> New producer + Snappy face un-compression errors after broker restart
> -
>
> Key: KAFKA-2308
> URL: https://issues.apache.org/jira/browse/KAFKA-2308
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.9.0.0, 0.8.2.2
>
> Attachments: KAFKA-2308.patch
>
>
> Looks like the new producer, when used with Snappy, following a broker 
> restart is sending messages the brokers can't decompress. This issue was 
> discussed at few mailing lists thread, but I don't think we ever resolved it.
> I can reproduce with trunk and Snappy 1.1.1.7. 
> To reproduce:
> 1. Start 3 brokers
> 2. Create a topic with 3 partitions and 3 replicas each.
> 2. Start performance producer with --new-producer --compression-codec 2 (and 
> set the number of messages to fairly high, to give you time. I went with 10M)
> 3. Bounce one of the brokers
> 4. The log of one of the surviving nodes should contain errors like:
> {code}
> 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
> on Broker 66]: Error processing append operation on partition [t3,0]
> kafka.common.KafkaException:
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at 
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
> at kafka.log.Log.liftedTree1$1(Log.scala:327)
> at kafka.log.Log.append(Log.scala:326)
> at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
> at 
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618873#comment-14618873
 ] 

Gwen Shapira commented on KAFKA-2308:
-

yes, I saw the Snappy test case too :)

Since its a confirmed Snappy bug, I don't think we need a Kafka test-case. We 
can just protect that call, right?

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
 at 
 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-08 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618867#comment-14618867
 ] 

Ewen Cheslack-Postava commented on KAFKA-2308:
--

[~gwenshap] The test case you gave doesn't quite do enough to trigger the bug. 
It releases the same buffer twice, but doesn't reuse it. I think you'd need to 
get the test to do something more like:

* Fill first record batch (batch 1) with records and drain (causing buffer to 
be released).
* At least start creating another batch (batch 2). This allocates the buffer to 
that batch.
* Reenqueue batch 1 and drain (causing buffer to be released second time).
* Continue enqueuing until it creates *another* batch (batch 3), which 
allocates the buffer yet again.
* Drain batches 2 and 3 and validate their contents.

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618921#comment-14618921
 ] 

Guozhang Wang commented on KAFKA-2308:
--

I was unit testing the patch while writing the last comment :) Just shipped it 
and committed to trunk.

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
 at 
 kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
 at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=1461#comment-1461
 ] 

Guozhang Wang commented on KAFKA-2308:
--

Agree, we do not need a test case inside Kafka code.

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
 at 
 kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270)
 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618901#comment-14618901
 ] 

Gwen Shapira commented on KAFKA-2308:
-

Was that a ship it, [~guozhang]? :)

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
 at 
 kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270)
 at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618981#comment-14618981
 ] 

Gwen Shapira commented on KAFKA-2308:
-

Thanks :)

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
 at 
 kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270)
 at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617206#comment-14617206
 ] 

Guozhang Wang commented on KAFKA-2308:
--

Hi [~gwenshap] thanks for the findings. Yes for compressed message set the 
close call will trigger Compressor.close() which:

1. close the compression input stream, which will likely write the left-over 
cached bytes to the underlying buffer.
2. set the wrapper message header accordingly, such as offset (as the number of 
compressed messages - 1), length, and crc.

For the second step I think it should be OK to execute twice, plus if not then 
gzip should also have the similar issue; but it seems for the first step 
calling stream.close() multiple times on snappy may be problematic. To verify 
that we can write some simple test code:

{code}
stream = new org.xerial.snappy.SnappyOutputStream(buffer-size);
// write some bytes to stream
stream.close();
stream.close(); // again
{code}

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira

 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-07 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617096#comment-14617096
 ] 

Gwen Shapira commented on KAFKA-2308:
-

I kinda know why it happens, a patch will take a bit of time since I can't 
reproduce the error in a unit test (although it reproduces nicely in a test 
environment). I'll put a preliminary patch without unit tests up in a sec, so 
people suffering from this issue can validate. Here's what I found:

When we get a retriable error in the producer (NETWORK_EXCEPTION for instance), 
the current record batch gets put first in its topic-partition message batch 
queue by completeBatch().
Next time Sender runs, it drains the queue and one of the things it does is to 
take the first batch from the queue and close() it. But if a batch was 
re-queued, it was already closed. Calling close() twice should be safe, and for 
un-compressed messages, it is. However, for compressed messages the logic in 
close() is rather complex, and I believe closing a batch twice messes up the 
record. I can't tell exactly where the close() logic becomes unsafe, but 
there's really no need to close a batch twice. MemoryRecords.close() can check 
if it is writable before closing, and only close() the record if it is 
writable. This guarantees closing will happen just once. 

Fixing this resolved the problem on my system.



 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira

 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-07 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617878#comment-14617878
 ] 

Gwen Shapira commented on KAFKA-2308:
-

Thanks for your comments, [~guozhang].

Closing multiple times on Snappy is not an issue. Actually, even draining, 
requeueing and draining again is not an issue by itself.
That is, I'm still unable to create a test that replicates this error, even 
though it reproduces nicely in a real cluster with the performance producer.

I have a patch that I'm fairly certain fixes the problem (although I cannot say 
why). I'll attach it here, because someone may need it, and continue digging 
into when and why does double-close corrupt messages.

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira

 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-07 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617887#comment-14617887
 ] 

Gwen Shapira commented on KAFKA-2308:
-

Created reviewboard https://reviews.apache.org/r/36290/diff/
 against branch trunk

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
 at 
 kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
 at 

[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart

2015-07-07 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617949#comment-14617949
 ] 

Gwen Shapira commented on KAFKA-2308:
-

Actually, looks likely that it is Snappy (even though I can't reproduce):
https://github.com/xerial/snappy-java/pull/108

Note that this is not in 1.1.1.7 (which we are using).

I suggest pushing our simple work-around (since its simple and nothing bad can 
happen from only closing once).

 New producer + Snappy face un-compression errors after broker restart
 -

 Key: KAFKA-2308
 URL: https://issues.apache.org/jira/browse/KAFKA-2308
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2308.patch


 Looks like the new producer, when used with Snappy, following a broker 
 restart is sending messages the brokers can't decompress. This issue was 
 discussed at few mailing lists thread, but I don't think we ever resolved it.
 I can reproduce with trunk and Snappy 1.1.1.7. 
 To reproduce:
 1. Start 3 brokers
 2. Create a topic with 3 partitions and 3 replicas each.
 2. Start performance producer with --new-producer --compression-codec 2 (and 
 set the number of messages to fairly high, to give you time. I went with 10M)
 3. Bounce one of the brokers
 4. The log of one of the surviving nodes should contain errors like:
 {code}
 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager 
 on Broker 66]: Error processing append operation on partition [t3,0]
 kafka.common.KafkaException:
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
 at 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
 at 
 kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
 at 
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
 at kafka.log.Log.liftedTree1$1(Log.scala:327)
 at kafka.log.Log.append(Log.scala:326)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
 at 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
 at 
 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at