When I use Storm-Kafka-Client 1.2.1, kafka-client 0.10.2.1, kafak-server 1.0.0. leader down, kafka spout went well.
But When I use Storm-kafka-client 1.2.1,kafka-client1.0.0 (the same as kafka sever version),kafka –server 1.0.0. Kafka spout stuck when kafka leader down. Is Storm-Kafka-Client 1.2.1 really compatible with kafka-client 1.0.0/1.0.1 ??? I guess there’re some version issue with kafka-client 1.0.0/1.0.1 and kafka-client 1.0.0/1.0.1 发件人: Ajeesh [mailto:[email protected]] 发送时间: 2018年3月27日 16:52 收件人: [email protected] 主题: Re: Storm kafka Spout Stuck When Kafka leader is Down Hi, Use the storm-kafka-client version same as Kafka Server version On Tue, Mar 27, 2018, 2:04 PM Han Jing <[email protected] <mailto:[email protected]> > wrote: Hi All, I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0, 1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is stuck, there is no responseon UI website ,even kakfa topic leader is alter to another broker, It’s still stuck, until restart the kafka server progress. Storm recovered from struk. Is Storm-Kafka-Client 1.2.1 compatible with kafka-client 1.0.0/1.0.1? Here’s some code and Storm log.Please help me with this issue. Thanks a lot. -------------------------------------------------------------- Code: Kafka-client version is the same the kafka version(1.0.0,1.0.1). Kafka is distribute on 3 brokers. There are 2 replicators and 1 partition for every Kafka topic. KafkaSpout configureateion is as below. The topology read from just one topic. TopologyBuilder builder = new TopologyBuilder(); //kafka Servers IP String bootstrapServers = properties.getProperty("bootstrap.servers"); //Kafka Spout consumer topic String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic"); //KafkaSpout KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic) .setProp(properties) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) .build(); //topology Spout,KAFKA_READER builder.setSpout(BOLT_ID_KAFKA_READER, new KafkaSpout<>(config), 1); -------------------------------------------------------------- Log: 518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata version 6 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.46:9092 <http://170.0.0.46:9092> (id: 3 rack: null), 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])]) 518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1, disconnected=false, requ estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377894), responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null))) 518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) 518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) dead 518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.46:9092 <http://170.0.0.46:9092> (id: 3 rack: null) 518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0, disconnected=false, requ estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377896), responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null))) 518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) 518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) dead 518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null) 518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0, disconnected=false, requ estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377897), responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null))) 518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) 518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat chgrp002] Initiating connection to node 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Fetching committed offsets for partitions: [tradeMatch-0] 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=matchg rp002] Connection with /170.0.0.39 <http://170.0.0.39> disconnected 518742 java.net.ConnectException: Connection refused 518743 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_101] 518744 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_101] 518745 at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-1.0.0.j ar:?] 518746 at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?] 518747 at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?] 518748 at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:?] 518749 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:?] 518750 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) [kafka-clients-1.0.0.ja r:?] 518751 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) [kafka-clients-1.0.0.ja r:?] 518752 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) [kafka-clients-1.0.0.ja r:?] 518753 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472) [kafka-cli ents-1.0.0.jar:?] 518754 at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?] 518755 at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464) [storm-kafka-client-1.2.1.jar:1.2.1] 518756 at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) [storm-kafka-client-1.2.1.jar:1.2.1] 518757 at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) [storm-kafka-client-1.2.1.jar:1.2.1] 518758 at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) [storm-core-1.2.1.jar:1.2.1] 518759 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] 518760 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] 518761 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat chgrp002] Node 2147483645 disconnected. 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer clientId=consumer-1, groupId=matc hgrp002] Connection to node 2147483645 could not be established. Broker may not be available. 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=matchgrp002] Cancelled OFFSET_FETCH request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-1, correlationId= 377898) with correlation id 377898 due to node 2147483645 being disconnected 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) dead 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat chgrp002] Sending metadata request (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 <http://170.0.0.46:9092> (id: 3 rack: null) 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null) 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata version 7 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null), 170.0.0.46:9092 <http://170.0.0.46:9092> (id: 3 rack: null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])]) 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Fetching committed offsets for partitions: [tradeMatch-0] 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=matchg rp002] Connection with /170.0.0.39 <http://170.0.0.39> disconnected 518742 java.net.ConnectException: Connection refused 518743 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_101] 518744 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_101] 518745 at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-1.0.0.j ar:?] 518746 at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?] 518747 at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?] 518748 at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:?] 518749 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:?] 518750 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) [kafka-clients-1.0.0.ja r:?] 518751 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) [kafka-clients-1.0.0.ja r:?] 518752 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) [kafka-clients-1.0.0.ja r:?] 518753 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472) [kafka-cli ents-1.0.0.jar:?] 518754 at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?] 518755 at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464) [storm-kafka-client-1.2.1.jar:1.2.1] 518756 at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) [storm-kafka-client-1.2.1.jar:1.2.1] 518757 at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) [storm-kafka-client-1.2.1.jar:1.2.1] 518758 at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) [storm-core-1.2.1.jar:1.2.1] 518759 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] 518760 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] 518761 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat chgrp002] Node 2147483645 disconnected. 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer clientId=consumer-1, groupId=matc hgrp002] Connection to node 2147483645 could not be established. Broker may not be available. 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=matchgrp002] Cancelled OFFSET_FETCH request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-1, correlationId= 377898) with correlation id 377898 due to node 2147483645 being disconnected 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) dead 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat chgrp002] Sending metadata request (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 <http://170.0.0.46:9092> (id: 3 rack: null) 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null) 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata version 7 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null), 170.0.0.46:9092 <http://170.0.0.46:9092> (id: 3 rack: null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])]) 518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0, disconnected=false, requ estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377900), responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null))) 518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) 518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null) dead 518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092> (id: 1 rack: null) 518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1, disconnected=false, requ estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377901), responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2 rack: null))) 518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092> (id: 2147483645 rack: null)
