Hello, I am having a most annoying issue. I am trying to create a local cluster via docker for development. I am using docker compose. I have zookeeper and kafka in seperate containers using docker linkages to set up communication. I have a seperate service that is trying to produce onto a topic.
When I try to do this I get the following trace level logging in the producing service: DEBUG 2015-05-06 20:42:34,617 kafka.utils.Logging$class (Logging.scala:52) [pool-10-thread-2] Handling 1 events TRACE 2015-05-06 20:42:34,634 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] Instantiating Scala Sync Producer with properties: {metadata.broker.list=kafka:9092, request.required.acks=1, port=9092, host=kafka, client.id=105f0bb5-6f39-4974-8c27-7dab575bcd80, producer.type=sync} INFO 2015-05-06 20:42:34,635 kafka.utils.Logging$class (Logging.scala:68) [pool-10-thread-2] Fetching metadata from broker id:0,host:kafka,port:9092 with correlation id 0 for 1 topic(s) Set(foo) TRACE 2015-05-06 20:42:34,637 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] verifying sendbuffer of size 60 DEBUG 2015-05-06 20:42:34,638 kafka.utils.Logging$class (Logging.scala:52) [pool-10-thread-2] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400), connectTimeoutMs = 10000. INFO 2015-05-06 20:42:34,639 kafka.utils.Logging$class (Logging.scala:68) [pool-10-thread-2] Connected to kafka:9092 for producing TRACE 2015-05-06 20:42:34,640 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] 64 bytes written. TRACE 2015-05-06 20:42:34,753 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] 28 bytes read. INFO 2015-05-06 20:42:34,757 kafka.utils.Logging$class (Logging.scala:68) [pool-10-thread-2] Disconnecting from kafka:9092 DEBUG 2015-05-06 20:42:34,758 kafka.utils.Logging$class (Logging.scala:52) [pool-10-thread-2] Successfully fetched metadata for 1 topic(s) Set(foo) TRACE 2015-05-06 20:42:34,762 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] Metadata for topic foo is {TopicMetadata for topic foo -> No partition metadata for topic foo due to kafka.common.LeaderNotAvailableException} WARN 2015-05-06 20:42:34,763 kafka.utils.Logging$class (Logging.scala:83) [pool-10-thread-2] Error while fetching metadata [{TopicMetadata for topic foo -> No partition metadata for topic foo due to kafka.common.LeaderNotAvailableException}] for topic [foo]: class kafka.common.LeaderNotAvailableException DEBUG 2015-05-06 20:42:34,764 kafka.utils.Logging$class (Logging.scala:52) [pool-10-thread-2] Getting broker partition info for topic foo TRACE 2015-05-06 20:42:34,765 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] Instantiating Scala Sync Producer with properties: {metadata.broker.list=kafka:9092, request.required.acks=1, port=9092, host=kafka, client.id=105f0bb5-6f39-4974-8c27-7dab575bcd80, producer.type=sync} INFO 2015-05-06 20:42:34,765 kafka.utils.Logging$class (Logging.scala:68) [pool-10-thread-2] Fetching metadata from broker id:0,host:kafka,port:9092 with correlation id 1 for 1 topic(s) Set(foo) TRACE 2015-05-06 20:42:34,765 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] verifying sendbuffer of size 60 DEBUG 2015-05-06 20:42:34,765 kafka.utils.Logging$class (Logging.scala:52) [pool-10-thread-2] Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400), connectTimeoutMs = 10000. INFO 2015-05-06 20:42:34,765 kafka.utils.Logging$class (Logging.scala:68) [pool-10-thread-2] Connected to kafka:9092 for producing TRACE 2015-05-06 20:42:34,766 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] 64 bytes written. TRACE 2015-05-06 20:42:34,773 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] 28 bytes read. INFO 2015-05-06 20:42:34,774 kafka.utils.Logging$class (Logging.scala:68) [pool-10-thread-2] Disconnecting from kafka:9092 DEBUG 2015-05-06 20:42:34,774 kafka.utils.Logging$class (Logging.scala:52) [pool-10-thread-2] Successfully fetched metadata for 1 topic(s) Set(foo) TRACE 2015-05-06 20:42:34,774 kafka.utils.Logging$class (Logging.scala:36) [pool-10-thread-2] Metadata for topic foo is {TopicMetadata for topic foo -> No partition metadata for topic foo due to kafka.common.LeaderNotAvailableException} WARN 2015-05-06 20:42:34,774 kafka.utils.Logging$class (Logging.scala:83) [pool-10-thread-2] Error while fetching metadata [{TopicMetadata for topic foo -> No partition metadata for topic foo due to kafka.common.LeaderNotAvailableException}] for topic [foo]: class kafka.common.LeaderNotAvailableException ERROR 2015-05-06 20:42:34,775 kafka.utils.Logging$class (Logging.scala:97) [pool-10-thread-2] Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: foo --- SNIP --- Lots of retry action etc. Obviously something is wrong. However, on the server side I see: kafka_1 | [2015-05-06 20:42:34,724] INFO Topic creation {"version":1,"partitions":{"0":[9092]}} (kafka.admin.AdminUtils$) kafka_1 | [2015-05-06 20:42:34,732] INFO [KafkaApi-9092] Auto creation of topic foo with 1 partitions and replication factor 1 is successful! (kafka.server.KafkaApis) Seems ok, but this is followed by a whole load of: kafka_1 | [2015-05-06 20:42:34,837] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:34,837] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:34,883] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:34,890] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:34,998] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:35,006] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:35,114] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:35,122] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:42:35,228] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:02,926] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,032] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,038] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,144] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,150] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,256] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,261] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) kafka_1 | [2015-05-06 20:43:03,368] INFO Closing socket connection to / 172.17.0.198. (kafka.network.Processor) Now looking at the code in Processor.scala, this looks the producer shutting down the connection, probably because of the leader not found problems. Looking at the state log on the server I see this: [2015-05-06 20:42:34,807] TRACE Controller 9092 epoch 1 changed partition [foo,0] state from NonExistentPartition to NewPartition with assigned replicas 9092 (state.change.logger) [2015-05-06 20:42:34,815] TRACE Controller 9092 epoch 1 changed state of replica 9092 for partition [foo,0] from NonExistentReplica to NewReplica (state.change.logger) [2015-05-06 20:42:34,873] TRACE Controller 9092 epoch 1 changed partition [foo,0] from NewPartition to OnlinePartition with leader 9092 (state.change.logger) [2015-05-06 20:42:34,876] TRACE Controller 9092 epoch 1 sending become-leader LeaderAndIsr request (Leader:9092,ISR:9092,LeaderEpoch:0,ControllerEpoch:1) with correlationId 7 to broker 9092 for partition [foo,0] (state.change.logger) [2015-05-06 20:42:34,878] TRACE Controller 9092 epoch 1 sending UpdateMetadata request (Leader:9092,ISR:9092,LeaderEpoch:0,ControllerEpoch:1) with correlationId 7 to broker 9092 for partition [foo,0] (state.change.logger) [2015-05-06 20:42:34,882] TRACE Controller 9092 epoch 1 changed state of replica 9092 for partition [foo,0] from NewReplica to OnlineReplica (state.change.logger) Which of course seem totally reasonable! So I am strugging to see what the problem is. Lastly in zk I see the following: get /brokers/ids/9092 {"jmx_port":-1,"timestamp":"1430944879746","host":"kafka","version":1,"port":9092} cZxid = 0x12 ctime = Wed May 06 20:41:19 UTC 2015 mZxid = 0x12 mtime = Wed May 06 20:41:19 UTC 2015 pZxid = 0x12 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x14d2af603f10000 dataLength = 82 numChildren = 0 broker is there with the right name / port advertised. get /brokers/topics/foo {"version":1,"partitions":{"0":[9092]}} cZxid = 0x1b ctime = Wed May 06 20:42:34 UTC 2015 mZxid = 0x1b mtime = Wed May 06 20:42:34 UTC 2015 pZxid = 0x1e cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 39 numChildren = 1 looks like we have the the right data for the topic as well... get /brokers/topics/foo/partitions/0/state {"controller_epoch":1,"leader":9092,"version":1,"leader_epoch":0,"isr":[9092]} cZxid = 0x20 ctime = Wed May 06 20:42:34 UTC 2015 mZxid = 0x20 mtime = Wed May 06 20:42:34 UTC 2015 pZxid = 0x20 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 78 numChildren = 0 and we have state for the partition with the right leader id and irs. What on earth is wrong? Note that the producer code works absolutely fine with a local (ie non containerized) deploy, so I am sure it is the kafka / zk setup, but nothing stands out to me as wrong. Any help much appreciated. I will supply docker compose files on request. Ben