Hi, I want to setup a kafka cluster in EC2, but I am experiencing some weird 
errors.

The scenario is:

I have a 5 nodes cluster and zookeeper, storm running without any problem. when 
I manually boot each node of kafka by using "JMX_PORT=9997 
bin/kafka-server-start.sh config/server-x.properties &" command.

First node, it can be booted.

Once I boot the second node, the first node is crashed, the error is below:

 

[2013-10-11 04:02:17,200] INFO [Replica Manager on Broker 0]: Handling 
LeaderAndIsr request 
Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
 -> 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
 (kafka.server.ReplicaManager)

[2013-10-11 04:02:17,204] WARN No previously checkpointed highwatermark value 
found for topic test-kafka partition 0. Returning 0 as the highwatermark 
(kafka.server.HighwaterMarkCheckpoint)

[2013-10-11 04:02:17,205] INFO [ReplicaFetcherManager on broker 0] Removing 
fetcher for partition [test-kafka,0] (kafka.server.ReplicaFetcherManager)

[2013-10-11 04:02:17,214] INFO [Kafka Log on Broker 0], Truncated log segment 
/tmp/kafka-logs/test-kafka-0/00000000000000000000.log to target offset 0 
(kafka.log.Log)

[2013-10-11 04:02:17,235] INFO [ReplicaFetcherManager on broker 0] Adding 
fetcher for partition [test-kafka,0], initOffset 0 to broker 1 with fetcherId 0 
(kafka.server.ReplicaFetcherManager)

[2013-10-11 04:02:17,236] INFO [Replica Manager on Broker 0]: Handled leader 
and isr request 
Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
 -> 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
 (kafka.server.ReplicaManager)

[2013-10-11 04:02:17,240] INFO [ReplicaFetcherThread-0-1], Starting  
(kafka.server.ReplicaFetcherThread)

[2013-10-11 04:02:17,266] INFO [Replica Manager on Broker 0]: Handling 
LeaderAndIsr request 
Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
 -> 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
 (kafka.server.ReplicaManager)

[2013-10-11 04:02:17,267] INFO [ReplicaFetcherManager on broker 0] Removing 
fetcher for partition [test-kafka,0] (kafka.server.ReplicaFetcherManager)

[2013-10-11 04:02:17,268] INFO [Kafka Log on Broker 0], Truncated log segment 
/tmp/kafka-logs/test-kafka-0/00000000000000000000.log to target offset 0 
(kafka.log.Log)

[2013-10-11 04:02:17,268] INFO [ReplicaFetcherManager on broker 0] Adding 
fetcher for partition [test-kafka,0], initOffset 0 to broker 1 with fetcherId 0 
(kafka.server.ReplicaFetcherManager)

[2013-10-11 04:02:17,269] INFO [Replica Manager on Broker 0]: Handled leader 
and isr request 
Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
 -> 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
 (kafka.server.ReplicaManager)

[2013-10-11 04:02:17,269] ERROR [Kafka Request Handler 0 on Broker 0], 
Exception when handling request (kafka.server.KafkaRequestHandler)

[2013-10-11 04:02:47,284] INFO Reconnect due to socket error:  
(kafka.consumer.SimpleConsumer)

 <http://java.net/> java.net.SocketTimeoutException

        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)

        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)

        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)

        at kafka.utils.Utils$.read(Utils.scala:394)

        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)

        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)

        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)

        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

[2013-10-11 04:02:47,292] ERROR [Kafka Request Handler 1 on Broker 0], 
Exception when handling request (kafka.server.KafkaRequestHandler)

 

Then I boot the third node until the last one, everything is good, except of 
second node.

 

After that, I tried to stop server one by one, I first stopped the broken node, 
then there is one of health node will crash with the same error as the previous 
broken one, it seems random. I stopped that broken node again, then there will 
be another random node broken with the same error.

 

 

When I tried to produce message, it gives me the below errors:

 

 

[2013-10-11 04:13:12,876] INFO Fetching metadata from broker 
id:0,host:localhost,port:9092 with correlation id 15 for 1 topic(s) 
Set(my-replicated-topic) (kafka.client.ClientUtils$)

[2013-10-11 04:13:12,876] INFO Connected to localhost:9092 for producing 
(kafka.producer.SyncProducer)

[2013-10-11 04:13:12,886] INFO Disconnecting from localhost:9092 
(kafka.producer.SyncProducer)

[2013-10-11 04:13:12,886] INFO Closing socket connection to / 
<http://127.0.0.1/> 127.0.0.1. (kafka.network.Processor)

[2013-10-11 04:13:12,887] WARN Error while fetching metadata [{TopicMetadata 
for topic my-replicated-topic ->

No partition metadata for topic my-replicated-topic due to 
kafka.common.LeaderNotAvailableException}] for topic [my-replicated-topic]: 
class kafka.common.LeaderNotAvailableException  
(kafka.producer.BrokerPartitionInfo)

[2013-10-11 04:13:12,887] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: my-replicated-topic 
(kafka.producer.async.DefaultEventHandler)

[2013-10-11 04:13:12,887] INFO Back off for 100 ms before retrying send. 
Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)

[2013-10-11 04:13:12,988] INFO Fetching metadata from broker 
id:0,host:localhost,port:9092 with correlation id 16 for 1 topic(s) 
Set(my-replicated-topic) (kafka.client.ClientUtils$)

[2013-10-11 04:13:12,989] INFO Connected to localhost:9092 for producing 
(kafka.producer.SyncProducer)

[2013-10-11 04:13:12,999] INFO Disconnecting from localhost:9092 
(kafka.producer.SyncProducer)

[2013-10-11 04:13:12,999] INFO Closing socket connection to / 
<http://127.0.0.1/> 127.0.0.1. (kafka.network.Processor)

[2013-10-11 04:13:13,000] WARN Error while fetching metadata [{TopicMetadata 
for topic my-replicated-topic ->

No partition metadata for topic my-replicated-topic due to 
kafka.common.LeaderNotAvailableException}] for topic [my-replicated-topic]: 
class kafka.common.LeaderNotAvailableException  
(kafka.producer.BrokerPartitionInfo)

[2013-10-11 04:13:13,000] ERROR Failed to send requests for topics 
my-replicated-topic with correlation ids in [9,16] 
(kafka.producer.async.DefaultEventHandler)

[2013-10-11 04:13:13,001] ERROR Error in handling batch of 1 events 
(kafka.producer.async.ProducerSendThread)

kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.

        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)

        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)

        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

        at scala.collection.immutable.Stream.foreach(Stream.scala:254)

        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)

        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

 

I configured everything according to the documents.

I copied the setting from one of my nodes

 

 <http://broker.id/> broker.id=3

 

############################# Socket Server Settings 
#############################

 

port=9092

 

 

num.network.threads=2

 

num.io.threads=2

 

socket.send.buffer.bytes=1048576

 

socket.receive.buffer.bytes=1048576

 

socket.request.max.bytes=104857600

 

log.dir=/tmp/kafka-logs

 

num.partitions=1

 

log.flush.interval.messages=10000

 

 <http://log.flush.interval.ms/> log.flush.interval.ms=1000

 

log.retention.hours=168

 

log.segment.bytes=536870912

 

 

log.cleanup.interval.mins=1

zookeeper.connect=localhost:2181

 

 

 <http://zookeeper.connection.timeout.ms/> 
zookeeper.connection.timeout.ms=1000000

 

 

kafka.metrics.polling.interval.secs=5

kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

kafka.csv.metrics.dir=/tmp/kafka_metrics

 

kafka.csv.metrics.reporter.enabled=false

 

 

FYI There is no any problem if I just run 1 single node.

Any help will be appreciated!

 

Reply via email to