Hi there, I skimmed through this message and I'm not sure how this problem is related to ZK. Could you please explain why you think it is ZK giving you a problem with your Kafka cluster?
Thanks, -Flavio > -----Original Message----- > From: Yi Jiang [mailto:[email protected]] > Sent: 11 October 2013 05:48 > To: [email protected] > Subject: Weird error from Kafka cluster > > Hi, I want to setup a kafka cluster in EC2, but I am experiencing some weird > errors. > > The scenario is: > > I have a 5 nodes cluster and zookeeper, storm running without any problem. > when I manually boot each node of kafka by using "JMX_PORT=9997 > bin/kafka-server-start.sh config/server-x.properties &" command. > > First node, it can be booted. > > Once I boot the second node, the first node is crashed, the error is below: > > > > [2013-10-11 04:02:17,200] INFO [Replica Manager on Broker 0]: Handling > LeaderAndIsr request > Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;C > orrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) > -> > (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),R > eplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 > (kafka.server.ReplicaManager) > > [2013-10-11 04:02:17,204] WARN No previously checkpointed highwatermark > value found for topic test-kafka partition 0. Returning 0 as the highwatermark > (kafka.server.HighwaterMarkCheckpoint) > > [2013-10-11 04:02:17,205] INFO [ReplicaFetcherManager on broker 0] > Removing fetcher for partition [test-kafka,0] > (kafka.server.ReplicaFetcherManager) > > [2013-10-11 04:02:17,214] INFO [Kafka Log on Broker 0], Truncated log > segment /tmp/kafka-logs/test-kafka-0/00000000000000000000.log to target > offset 0 (kafka.log.Log) > > [2013-10-11 04:02:17,235] INFO [ReplicaFetcherManager on broker 0] Adding > fetcher for partition [test-kafka,0], initOffset 0 to broker 1 with fetcherId > 0 > (kafka.server.ReplicaFetcherManager) > > [2013-10-11 04:02:17,236] INFO [Replica Manager on Broker 0]: Handled > leader and isr request > Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;C > orrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) > -> > (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),R > eplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 > (kafka.server.ReplicaManager) > > [2013-10-11 04:02:17,240] INFO [ReplicaFetcherThread-0-1], Starting > (kafka.server.ReplicaFetcherThread) > > [2013-10-11 04:02:17,266] INFO [Replica Manager on Broker 0]: Handling > LeaderAndIsr request > Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;C > orrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) > -> > (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),R > eplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 > (kafka.server.ReplicaManager) > > [2013-10-11 04:02:17,267] INFO [ReplicaFetcherManager on broker 0] > Removing fetcher for partition [test-kafka,0] > (kafka.server.ReplicaFetcherManager) > > [2013-10-11 04:02:17,268] INFO [Kafka Log on Broker 0], Truncated log > segment /tmp/kafka-logs/test-kafka-0/00000000000000000000.log to target > offset 0 (kafka.log.Log) > > [2013-10-11 04:02:17,268] INFO [ReplicaFetcherManager on broker 0] Adding > fetcher for partition [test-kafka,0], initOffset 0 to broker 1 with fetcherId > 0 > (kafka.server.ReplicaFetcherManager) > > [2013-10-11 04:02:17,269] INFO [Replica Manager on Broker 0]: Handled > leader and isr request > Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;C > orrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) > -> > (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),R > eplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 > (kafka.server.ReplicaManager) > > [2013-10-11 04:02:17,269] ERROR [Kafka Request Handler 0 on Broker 0], > Exception when handling request (kafka.server.KafkaRequestHandler) > > [2013-10-11 04:02:47,284] INFO Reconnect due to socket error: > (kafka.consumer.SimpleConsumer) > > <http://java.net/> java.net.SocketTimeoutException > > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:3 > 85) > > at kafka.utils.Utils$.read(Utils.scala:394) > > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferR > eceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteB > ufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) > > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$send > Request(SimpleConsumer.scala:71) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV > $sp$1.apply$mcV$sp(SimpleConsumer.scala:110) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV > $sp$1.apply(SimpleConsumer.scala:110) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV > $sp$1.apply(SimpleConsumer.scala:110) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Simple > Consumer.scala:109) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsume > r.scala:109) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsume > r.scala:109) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108) > > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherT > hread.scala:96) > > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:8 > 8) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > [2013-10-11 04:02:47,292] ERROR [Kafka Request Handler 1 on Broker 0], > Exception when handling request (kafka.server.KafkaRequestHandler) > > > > Then I boot the third node until the last one, everything is good, except of > second node. > > > > After that, I tried to stop server one by one, I first stopped the broken > node, > then there is one of health node will crash with the same error as the > previous broken one, it seems random. I stopped that broken node again, > then there will be another random node broken with the same error. > > > > > > When I tried to produce message, it gives me the below errors: > > > > > > [2013-10-11 04:13:12,876] INFO Fetching metadata from broker > id:0,host:localhost,port:9092 with correlation id 15 for 1 topic(s) Set(my- > replicated-topic) (kafka.client.ClientUtils$) > > [2013-10-11 04:13:12,876] INFO Connected to localhost:9092 for producing > (kafka.producer.SyncProducer) > > [2013-10-11 04:13:12,886] INFO Disconnecting from localhost:9092 > (kafka.producer.SyncProducer) > > [2013-10-11 04:13:12,886] INFO Closing socket connection to / > <http://127.0.0.1/> 127.0.0.1. (kafka.network.Processor) > > [2013-10-11 04:13:12,887] WARN Error while fetching metadata > [{TopicMetadata for topic my-replicated-topic -> > > No partition metadata for topic my-replicated-topic due to > kafka.common.LeaderNotAvailableException}] for topic [my-replicated- > topic]: class kafka.common.LeaderNotAvailableException > (kafka.producer.BrokerPartitionInfo) > > [2013-10-11 04:13:12,887] ERROR Failed to collate messages by topic, partition > due to: Failed to fetch topic metadata for topic: my-replicated-topic > (kafka.producer.async.DefaultEventHandler) > > [2013-10-11 04:13:12,887] INFO Back off for 100 ms before retrying send. > Remaining retries = 0 (kafka.producer.async.DefaultEventHandler) > > [2013-10-11 04:13:12,988] INFO Fetching metadata from broker > id:0,host:localhost,port:9092 with correlation id 16 for 1 topic(s) Set(my- > replicated-topic) (kafka.client.ClientUtils$) > > [2013-10-11 04:13:12,989] INFO Connected to localhost:9092 for producing > (kafka.producer.SyncProducer) > > [2013-10-11 04:13:12,999] INFO Disconnecting from localhost:9092 > (kafka.producer.SyncProducer) > > [2013-10-11 04:13:12,999] INFO Closing socket connection to / > <http://127.0.0.1/> 127.0.0.1. (kafka.network.Processor) > > [2013-10-11 04:13:13,000] WARN Error while fetching metadata > [{TopicMetadata for topic my-replicated-topic -> > > No partition metadata for topic my-replicated-topic due to > kafka.common.LeaderNotAvailableException}] for topic [my-replicated- > topic]: class kafka.common.LeaderNotAvailableException > (kafka.producer.BrokerPartitionInfo) > > [2013-10-11 04:13:13,000] ERROR Failed to send requests for topics my- > replicated-topic with correlation ids in [9,16] > (kafka.producer.async.DefaultEventHandler) > > [2013-10-11 04:13:13,001] ERROR Error in handling batch of 1 events > (kafka.producer.async.ProducerSendThread) > > kafka.common.FailedToSendMessageException: Failed to send messages > after 3 tries. > > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scal > a:90) > > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThre > ad.scala:104) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.appl > y(ProducerSendThread.scala:87) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.appl > y(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThr > ead.scala:66) > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4 > 4) > > > > I configured everything according to the documents. > > I copied the setting from one of my nodes > > > > <http://broker.id/> broker.id=3 > > > > ############################# Socket Server Settings > ############################# > > > > port=9092 > > > > > > num.network.threads=2 > > > > num.io.threads=2 > > > > socket.send.buffer.bytes=1048576 > > > > socket.receive.buffer.bytes=1048576 > > > > socket.request.max.bytes=104857600 > > > > log.dir=/tmp/kafka-logs > > > > num.partitions=1 > > > > log.flush.interval.messages=10000 > > > > <http://log.flush.interval.ms/> log.flush.interval.ms=1000 > > > > log.retention.hours=168 > > > > log.segment.bytes=536870912 > > > > > > log.cleanup.interval.mins=1 > > zookeeper.connect=localhost:2181 > > > > > > <http://zookeeper.connection.timeout.ms/> > zookeeper.connection.timeout.ms=1000000 > > > > > > kafka.metrics.polling.interval.secs=5 > > kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter > > kafka.csv.metrics.dir=/tmp/kafka_metrics > > > > kafka.csv.metrics.reporter.enabled=false > > > > > > FYI There is no any problem if I just run 1 single node. > > Any help will be appreciated! > >
