[jira] [Commented] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper

2018-02-08 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357221#comment-16357221
 ] 

Nick Travers commented on KAFKA-4914:
-

[~damianguy] - noticed this got updated recently. I've had a PR open for for a 
while now (coming up on a year).

[~ijuma] has been helping me iterate on it. Would love to get that merged if 
possible!

> Partition re-assignment tool should check types before persisting state in 
> ZooKeeper
> 
>
> Key: KAFKA-4914
> URL: https://issues.apache.org/jira/browse/KAFKA-4914
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>Assignee: Nick Travers
>Priority: Major
> Fix For: 2.0.0
>
>
> The partition-reassignment too currently allows non-type-safe information to 
> be persisted into ZooKeeper, which can result in a ClassCastException at 
> runtime for brokers.
> Specifically, this occurred when the broker assignment field was a List of 
> Strings, instead of a List of Integers.
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
> at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2018-02-06 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354259#comment-16354259
 ] 

Nick Travers commented on KAFKA-4669:
-

Chiming in again to note that we're still running into this issue 
intermittently. The failure mode is the same, with a BufferUnderflowException 
and stack trace similar to what I posted above.

For some additional context, when this occurs it ultimately leads to a JVM that 
cannot exit as it is waiting on a latch that will never be closed. Here's the 
hung thread
{code:java}
"async-message-sender-0" #120 daemon prio=5 os_prio=0 tid=0x7f30b4003000 
nid=0x195a1 waiting on condition [0x7f3105ce1000]
   java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@9/Native Method)
- parking to wait for  <0x0007852b1b68> (a 
java.util.concurrent.CountDownLatch$Sync)
at 
java.util.concurrent.locks.LockSupport.park(java.base@9/LockSupport.java:194)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@9/AbstractQueuedSynchronizer.java:871)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1024)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1331)
at 
java.util.concurrent.CountDownLatch.await(java.base@9/CountDownLatch.java:232)
at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
at 
com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
- locked <0x000728c71998> (a 
com.squareup.kafka.ng.producer.KafkaProducer)
at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@9/ThreadPoolExecutor.java:1167)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@9/ThreadPoolExecutor.java:641)
at java.lang.Thread.run(java.base@9/Thread.java:844)
{code}
[Here is the 
latch|https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java#L34]
 that is still open in the ProduceRequestResult. I assume that the network 
thread is responsible for closing that, but if that thread crashes for whatever 
reason, it never gets a chance to callCountDownLatch#countDown.

Arguably, we should probably be using a combination of daemon threads, and the 
timed version of Future#get, but it _feels_ like something that could be fixed 
in the producer client, even if it's just for the sake of ensuring that failed 
ProduceRequestResults can be GC'd eventually, which can't happen if another 
thread is hung waiting on the latch.

cc: [~rsivaram] [~hachikuji]

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at 

[jira] [Commented] (KAFKA-6084) ReassignPartitionsCommand should propagate JSON parsing failures

2017-12-17 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294278#comment-16294278
 ] 

Nick Travers commented on KAFKA-6084:
-

I linked this to KAFKA-4914. I believe that the [proposed 
fix|https://github.com/apache/kafka/pull/2708] there should also fix this issue 
too.

> ReassignPartitionsCommand should propagate JSON parsing failures
> 
>
> Key: KAFKA-6084
> URL: https://issues.apache.org/jira/browse/KAFKA-6084
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Minor
>  Labels: easyfix, newbie
> Attachments: Screen Shot 2017-10-18 at 23.31.22.png
>
>
> Basically looking at Json.scala it will always swallow any parsing errors:
> {code}
>   def parseFull(input: String): Option[JsonValue] =
> try Option(mapper.readTree(input)).map(JsonValue(_))
> catch { case _: JsonProcessingException => None }
> {code}
> However sometimes it is easy to figure out the problem by simply looking at 
> the JSON, in some cases it is not very trivial, such as some invisible 
> characters (like byte order mark) won't be displayed by most of the text 
> editors and can people spend time on figuring out what's the problem.
> As Jackson provides a really detailed exception about what failed and how, it 
> is easy to propagate the failure to the user.
> As an example I attached a BOM prefixed JSON which fails with the following 
> error which is very counterintuitive:
> {noformat}
> [root@localhost ~]# kafka-reassign-partitions --zookeeper localhost:2181 
> --reassignment-json-file /root/increase-replication-factor.json --execute
> Partitions reassignment failed due to Partition reassignment data file 
> /root/increase-replication-factor.json is empty
> kafka.common.AdminCommandFailedException: Partition reassignment data file 
> /root/increase-replication-factor.json is empty
> at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:120)
> at 
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:52)
> at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
> ...
> {noformat}
> In case of the above error it would be much better to see what fails exactly:
> {noformat}
> kafka.common.AdminCommandFailedException: Admin command failed
>   at 
> kafka.admin.ReassignPartitionsCommand$.parsePartitionReassignmentData(ReassignPartitionsCommand.scala:267)
>   at 
> kafka.admin.ReassignPartitionsCommand$.parseAndValidate(ReassignPartitionsCommand.scala:275)
>   at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:197)
>   at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:193)
>   at 
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:64)
>   at 
> kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
> Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected 
> character ('' (code 65279 / 0xfeff)): expected a valid value (number, 
> String, array, object, 'true', 'false' or 'null')
>  at [Source: (String)"{"version":1,
>   "partitions":[
>{"topic": "test1", "partition": 0, "replicas": [1,2]},
>{"topic": "test2", "partition": 1, "replicas": [2,3]}
> ]}"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1798)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:663)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:561)
>   at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1892)
>   at 
> com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:747)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4030)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2539)
>   at kafka.utils.Json$.kafka$utils$Json$$doParseFull(Json.scala:46)
>   at kafka.utils.Json$$anonfun$tryParseFull$1.apply(Json.scala:44)
>   at kafka.utils.Json$$anonfun$tryParseFull$1.apply(Json.scala:44)
>   at scala.util.Try$.apply(Try.scala:192)
>   at kafka.utils.Json$.tryParseFull(Json.scala:44)
>   at 
> kafka.admin.ReassignPartitionsCommand$.parsePartitionReassignmentData(ReassignPartitionsCommand.scala:241)
>   ... 5 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-11-30 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272900#comment-16272900
 ] 

Nick Travers commented on KAFKA-4669:
-

[~rsivaram] [~hachikuji] - I did some digging on the broker side. While we 
usually see a steady stream of exceptions in the broker logs due to oversized 
messages, here's another exception from around the time we say the exception in 
the producer client.

There were over 100 of these exception on the same broker network thread within 
the same second. A few milliseconds after this, the exception was seen in the 
producer client. Hard to say definitively on the ordering, given clock skew 
between the different broker and client hosts.

{code}
2017-11-28 09:18:20,599 
kafka-network-thread-28-ListenerName(PLAINTEXT)-PLAINTEXT-4 Processor got 
uncaught exception.
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.HeapByteBuffer.getShort(HeapByteBuffer.java:310)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:70)
at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:518)
at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:511)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.network.Processor.processCompletedReceives(SocketServer.scala:511)
at kafka.network.Processor.run(SocketServer.scala:436)
at java.lang.Thread.run(Thread.java:748)
{code}

Here's an example of the oversized message exceptions in the broker:

{code}
2017-11-28 09:18:12,379 
kafka-network-thread-28-ListenerName(PLAINTEXT)-PLAINTEXT-4 Unexpected error 
from /10.4.12.70; closing connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size 
= 335544320 larger than 104857600)
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at kafka.network.Processor.poll(SocketServer.scala:500)
at kafka.network.Processor.run(SocketServer.scala:435)
at java.lang.Thread.run(Thread.java:748)
{code}

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> 

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-11-29 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272142#comment-16272142
 ] 

Nick Travers commented on KAFKA-4669:
-

[~hachikuji] - unfortunately, we only have the stack trace related to the I/O 
exception that I posted earlier. We don't log anything lower than WARN for the 
kafka clients.

In our case a producer thread was affected, rather than a consumer.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-11-28 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269266#comment-16269266
 ] 

Nick Travers commented on KAFKA-4669:
-

We hit this again in production, running 0.11.0.1. Same symptoms as previously 
reported:

{code}
2017-11-28 09:18:20,674 apa158.sjc2b.square kafka-producer-network-thread | 
producer-2 Uncaught error in kafka producer I/O thread: 
java.lang.IllegalStateException: Correlation id for response (1835513) does not 
match request (1835503), request header: 
{api_key=0,api_version=3,correlation_id=1835503,client_id=producer-2}
at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
{code}

A side-effect was that this effectively caused a deadlock in the affected JVM 
as it had a thread waiting for the completion of a send (which awaits on a 
latch), but this could never occur as the I/O thread had presumably crashed:

{code}
"async-message-sender-0" #1761 daemon prio=5 os_prio=0 tid=0x7f3f04006800 
nid=0x4356a waiting on condition [0x7f3de9425000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00075a5e9140> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
at 
com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
- locked <0x000734fc4dc0> (a 
com.squareup.kafka.ng.producer.KafkaProducer)
at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

This doesn't look like an easy one to reproduce on our side, so I'm wondering 
what the best course of action is here. Is it worth opening this ticket 
[~ijuma]?

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> 

[jira] [Commented] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper

2017-09-23 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177895#comment-16177895
 ] 

Nick Travers commented on KAFKA-4914:
-

[~guozhang] [~omkreddy] - I've had a patch available for this one available for 
a while now. Can someone re-assign the ticket (I can't seem to do that myself) 
so it is considered for 1.0? Cheers.

> Partition re-assignment tool should check types before persisting state in 
> ZooKeeper
> 
>
> Key: KAFKA-4914
> URL: https://issues.apache.org/jira/browse/KAFKA-4914
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>Priority: Minor
> Fix For: 1.0.0
>
>
> The partition-reassignment too currently allows non-type-safe information to 
> be persisted into ZooKeeper, which can result in a ClassCastException at 
> runtime for brokers.
> Specifically, this occurred when the broker assignment field was a List of 
> Strings, instead of a List of Integers.
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
> at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)