[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037210#comment-15037210 ] ASF GitHub Bot commented on KAFKA-2308: --- Github user darionyaphet commented on the pull request: https://github.com/apache/storm/pull/801#issuecomment-161509467 Hi @knusbaum @revans2 I read `Kafka Release Notes Version 0.8.2.2` and found a bug fixed ([KAFKA-2308](https://issues.apache.org/jira/browse/KAFKA-2308)) about New producer and Snappy un-compression errors when Kafka Broker restart . So I think this is maybe useful . > New producer + Snappy face un-compression errors after broker restart > - > > Key: KAFKA-2308 > URL: https://issues.apache.org/jira/browse/KAFKA-2308 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira > Fix For: 0.9.0.0, 0.8.2.2 > > Attachments: KAFKA-2308.patch > > > Looks like the new producer, when used with Snappy, following a broker > restart is sending messages the brokers can't decompress. This issue was > discussed at few mailing lists thread, but I don't think we ever resolved it. > I can reproduce with trunk and Snappy 1.1.1.7. > To reproduce: > 1. Start 3 brokers > 2. Create a topic with 3 partitions and 3 replicas each. > 2. Start performance producer with --new-producer --compression-codec 2 (and > set the number of messages to fairly high, to give you time. I went with 10M) > 3. Bounce one of the brokers > 4. The log of one of the surviving nodes should contain errors like: > {code} > 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager > on Broker 66]: Error processing append operation on partition [t3,0] > kafka.common.KafkaException: > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at > kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) > at kafka.log.Log.liftedTree1$1(Log.scala:327) > at kafka.log.Log.append(Log.scala:326) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034497#comment-15034497 ] Guozhang Wang commented on KAFKA-2308: -- [~allenxwang] This bug should be only in the producer due to its use patterns of snappy. > New producer + Snappy face un-compression errors after broker restart > - > > Key: KAFKA-2308 > URL: https://issues.apache.org/jira/browse/KAFKA-2308 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira > Fix For: 0.9.0.0, 0.8.2.2 > > Attachments: KAFKA-2308.patch > > > Looks like the new producer, when used with Snappy, following a broker > restart is sending messages the brokers can't decompress. This issue was > discussed at few mailing lists thread, but I don't think we ever resolved it. > I can reproduce with trunk and Snappy 1.1.1.7. > To reproduce: > 1. Start 3 brokers > 2. Create a topic with 3 partitions and 3 replicas each. > 2. Start performance producer with --new-producer --compression-codec 2 (and > set the number of messages to fairly high, to give you time. I went with 10M) > 3. Bounce one of the brokers > 4. The log of one of the surviving nodes should contain errors like: > {code} > 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager > on Broker 66]: Error processing append operation on partition [t3,0] > kafka.common.KafkaException: > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at > kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) > at kafka.log.Log.liftedTree1$1(Log.scala:327) > at kafka.log.Log.append(Log.scala:326) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) > at >
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034439#comment-15034439 ] Allen Wang commented on KAFKA-2308: --- [~gwenshap] [~guozhang] Is the fix in producer only? If I take 0.8.2.2 producer, do I also need to have broker/consumer upgraded to 0.8.2.2 or a later snappy version in order to avoid this bug? Currently our broker is on 0.8.2.1 and snappy 1.1.1.6. > New producer + Snappy face un-compression errors after broker restart > - > > Key: KAFKA-2308 > URL: https://issues.apache.org/jira/browse/KAFKA-2308 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira > Fix For: 0.9.0.0, 0.8.2.2 > > Attachments: KAFKA-2308.patch > > > Looks like the new producer, when used with Snappy, following a broker > restart is sending messages the brokers can't decompress. This issue was > discussed at few mailing lists thread, but I don't think we ever resolved it. > I can reproduce with trunk and Snappy 1.1.1.7. > To reproduce: > 1. Start 3 brokers > 2. Create a topic with 3 partitions and 3 replicas each. > 2. Start performance producer with --new-producer --compression-codec 2 (and > set the number of messages to fairly high, to give you time. I went with 10M) > 3. Bounce one of the brokers > 4. The log of one of the surviving nodes should contain errors like: > {code} > 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager > on Broker 66]: Error processing append operation on partition [t3,0] > kafka.common.KafkaException: > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at > kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) > at > kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) > at kafka.log.Log.liftedTree1$1(Log.scala:327) > at kafka.log.Log.append(Log.scala:326) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) > at > kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618873#comment-14618873 ] Gwen Shapira commented on KAFKA-2308: - yes, I saw the Snappy test case too :) Since its a confirmed Snappy bug, I don't think we need a Kafka test-case. We can just protect that call, right? New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618867#comment-14618867 ] Ewen Cheslack-Postava commented on KAFKA-2308: -- [~gwenshap] The test case you gave doesn't quite do enough to trigger the bug. It releases the same buffer twice, but doesn't reuse it. I think you'd need to get the test to do something more like: * Fill first record batch (batch 1) with records and drain (causing buffer to be released). * At least start creating another batch (batch 2). This allocates the buffer to that batch. * Reenqueue batch 1 and drain (causing buffer to be released second time). * Continue enqueuing until it creates *another* batch (batch 3), which allocates the buffer yet again. * Drain batches 2 and 3 and validate their contents. New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618921#comment-14618921 ] Guozhang Wang commented on KAFKA-2308: -- I was unit testing the patch while writing the last comment :) Just shipped it and committed to trunk. New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=1461#comment-1461 ] Guozhang Wang commented on KAFKA-2308: -- Agree, we do not need a test case inside Kafka code. New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270)
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618901#comment-14618901 ] Gwen Shapira commented on KAFKA-2308: - Was that a ship it, [~guozhang]? :) New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14618981#comment-14618981 ] Gwen Shapira commented on KAFKA-2308: - Thanks :) New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617206#comment-14617206 ] Guozhang Wang commented on KAFKA-2308: -- Hi [~gwenshap] thanks for the findings. Yes for compressed message set the close call will trigger Compressor.close() which: 1. close the compression input stream, which will likely write the left-over cached bytes to the underlying buffer. 2. set the wrapper message header accordingly, such as offset (as the number of compressed messages - 1), length, and crc. For the second step I think it should be OK to execute twice, plus if not then gzip should also have the similar issue; but it seems for the first step calling stream.close() multiple times on snappy may be problematic. To verify that we can write some simple test code: {code} stream = new org.xerial.snappy.SnappyOutputStream(buffer-size); // write some bytes to stream stream.close(); stream.close(); // again {code} New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617096#comment-14617096 ] Gwen Shapira commented on KAFKA-2308: - I kinda know why it happens, a patch will take a bit of time since I can't reproduce the error in a unit test (although it reproduces nicely in a test environment). I'll put a preliminary patch without unit tests up in a sec, so people suffering from this issue can validate. Here's what I found: When we get a retriable error in the producer (NETWORK_EXCEPTION for instance), the current record batch gets put first in its topic-partition message batch queue by completeBatch(). Next time Sender runs, it drains the queue and one of the things it does is to take the first batch from the queue and close() it. But if a batch was re-queued, it was already closed. Calling close() twice should be safe, and for un-compressed messages, it is. However, for compressed messages the logic in close() is rather complex, and I believe closing a batch twice messes up the record. I can't tell exactly where the close() logic becomes unsafe, but there's really no need to close a batch twice. MemoryRecords.close() can check if it is writable before closing, and only close() the record if it is writable. This guarantees closing will happen just once. Fixing this resolved the problem on my system. New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617878#comment-14617878 ] Gwen Shapira commented on KAFKA-2308: - Thanks for your comments, [~guozhang]. Closing multiple times on Snappy is not an issue. Actually, even draining, requeueing and draining again is not an issue by itself. That is, I'm still unable to create a test that replicates this error, even though it reproduces nicely in a real cluster with the performance producer. I have a patch that I'm fairly certain fixes the problem (although I cannot say why). I'll attach it here, because someone may need it, and continue digging into when and why does double-close corrupt messages. New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617887#comment-14617887 ] Gwen Shapira commented on KAFKA-2308: - Created reviewboard https://reviews.apache.org/r/36290/diff/ against branch trunk New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286) at
[jira] [Commented] (KAFKA-2308) New producer + Snappy face un-compression errors after broker restart
[ https://issues.apache.org/jira/browse/KAFKA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617949#comment-14617949 ] Gwen Shapira commented on KAFKA-2308: - Actually, looks likely that it is Snappy (even though I can't reproduce): https://github.com/xerial/snappy-java/pull/108 Note that this is not in 1.1.1.7 (which we are using). I suggest pushing our simple work-around (since its simple and nothing bad can happen from only closing once). New producer + Snappy face un-compression errors after broker restart - Key: KAFKA-2308 URL: https://issues.apache.org/jira/browse/KAFKA-2308 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2308.patch Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it. I can reproduce with trunk and Snappy 1.1.1.7. To reproduce: 1. Start 3 brokers 2. Create a topic with 3 partitions and 3 replicas each. 2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M) 3. Bounce one of the brokers 4. The log of one of the surviving nodes should contain errors like: {code} 2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0] kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at