[ 
https://issues.apache.org/jira/browse/KAFKA-3594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15251692#comment-15251692
 ] 

Manikumar Reddy edited comment on KAFKA-3594 at 4/21/16 10:43 AM:
------------------------------------------------------------------

Thanks for reporting this. I reproduced the error by creating producer retry 
scenario. 
Below exception is occurring when producer is trying to append a record to a 
Re-enqueued record batch in the accumulator.  We should not allow to add a 
record to Re-enqueued record batch. This is happening due a bug in 
MemoryRecords.java/hasRooFor() method. After calling MemoryRecords.close() 
method, hasRooFor() method should return false. I will submit patch soon.

{code}
java.lang.IllegalStateException: Memory records is not writable
        at 
org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:95)
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:70)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:176)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:467)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
{code}


was (Author: omkreddy):
Thanks for reporting this. I reproduced the error by creating producer retry 
scenario. 
Below exception is occurring when producer is trying to append a record to a 
Re-enqueued record batch in the accumulator.
We should not allow to add a record to Re-enqueued record batch. This is 
happening due a bug in MemoryRecords.java/hasRooFor()
method. After calling MemoryRecords.close() method, hasRooFor() method should 
return false. I will submit patch soon.

{code}
java.lang.IllegalStateException: Memory records is not writable
        at 
org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:95)
        at 
org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:70)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:176)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:467)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
{code}

> Kafka new producer retries doesn't work in 0.9.0.1
> --------------------------------------------------
>
>                 Key: KAFKA-3594
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3594
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.9.0.1
>         Environment: Debian 7.8 Wheezy / Kafka 0.9.0.1 (Confluent 2.0.1)
>            Reporter: Nicolas PHUNG
>            Assignee: Manikumar Reddy
>            Priority: Critical
>              Labels: kafka, new, producer, replication, retry
>             Fix For: 0.10.0.0
>
>
> Hello,
> I'm encountering an issue with the new Producer on 0.9.0.1 client with a 
> 0.9.0.1 Kafka broker when Kafka broker are offline for example. It seems the 
> retries doesn't work anymore and I got the following error logs :
> {noformat}
> play.api.Application$$anon$1: Execution exception[[IllegalStateException: 
> Memory records is not writable]]
>         at play.api.Application$class.handleError(Application.scala:296) 
> ~[com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
>         at play.api.DefaultApplication.handleError(Application.scala:402) 
> [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
>         at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
>         at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
>         at scala.Option.map(Option.scala:146) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3.applyOrElse(PlayDefaultUpstreamHandler.scala:320)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
>         at 
> play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3.applyOrElse(PlayDefaultUpstreamHandler.scala:316)
>  [com.typesafe.play.play_2.11-2.3.10.jar:2.3.10]
>         at 
> scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:346) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:345) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:46) 
> [com.typesafe.play.play-iteratees_2.11-2.3.10.jar:2.3.10]
>         at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at scala.concurrent.Promise$class.complete(Promise.scala:55) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) 
> [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
> [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>  [com.typesafe.akka.akka-actor_2.11-2.3.4.jar:na]
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [org.scala-lang.scala-library-2.11.8.jar:na]
> Caused by: java.lang.IllegalStateException: Memory records is not writable
>         at 
> org.apache.kafka.common.record.MemoryRecords.append(MemoryRecords.java:93) 
> ~[org.apache.kafka.kafka-clients-0.9.0.1-cp1.jar:na]
>         at 
> org.apache.kafka.clients.producer.internals.RecordBatch.tryAppend(RecordBatch.java:69)
>  ~[org.apache.kafka.kafka-clients-0.9.0.1-cp1.jar:na]
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:168)
>  ~[org.apache.kafka.kafka-clients-0.9.0.1-cp1.jar:na]
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:435) 
> ~[org.apache.kafka.kafka-clients-0.9.0.1-cp1.jar:na]
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339) 
> ~[org.apache.kafka.kafka-clients-0.9.0.1-cp1.jar:na]
>         at 
> services.KafkaProducerService.sendToKafka(KafkaProducerService.scala:136) 
> ~[fr.figarocms.tracker-fca-nextgen-2.58.jar:2.58]
>         at 
> services.KafkaProducerService$$anonfun$send$1.apply(KafkaProducerService.scala:55)
>  ~[fr.figarocms.tracker-fca-nextgen-2.58.jar:2.58]
>         at 
> services.KafkaProducerService$$anonfun$send$1.apply(KafkaProducerService.scala:55)
>  ~[fr.figarocms.tracker-fca-nextgen-2.58.jar:2.58]
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  ~[org.scala-lang.scala-library-2.11.8.jar:na]
>         at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
> ~[org.scala-lang.scala-library-2.11.8.jar:na]
> {noformat}
> We tried the same various breakdown (kafka(s), zookeeper) with 0.8.2.2 client 
> and Kafka broker 0.8.2.2 and the retries work as expected on the older 
> version. 
> We tested this with 3 brokers with a replication factor 3 and in sync replica 
> 2. The error log appear when we got only one broker left here on 0.9.0.1. Can 
> this be related to KAFKA-3147 fix ?
> Regards,
> Nicolas PHUNG



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to