Hi, Use the storm-kafka-client version same as Kafka Server version On Tue, Mar 27, 2018, 2:04 PM Han Jing <hanjingjing1...@gmail.com> wrote:
> Hi All, > > I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0, > 1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is > stuck, there is no responseon UI website ,even kakfa topic leader is alter > to another broker, It’s still stuck, until restart the kafka server > progress. Storm recovered from struk. > > * Is Storm-Kafka-Client 1.2.1 compatible with kafka-client > 1.0.0/1.0.1?* > > > > * Here’s some code and Storm log.Please help me with this issue.* > > *Thanks a lot.* > > -------------------------------------------------------------- > > *Code:* > > Kafka-client version is the same the kafka version(1.0.0,1.0.1). > > Kafka is distribute on 3 brokers. There are 2 replicators and 1 > partition for every Kafka topic. > > KafkaSpout configureateion is as below. The topology read from just > one topic. > > TopologyBuilder builder = new TopologyBuilder(); > //kafka Servers IP > String bootstrapServers = properties.getProperty("bootstrap.servers"); > //Kafka Spout consumer topic > String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic"); > //KafkaSpout > KafkaSpoutConfig<String, String> config = > KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic) > .setProp(properties) > > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.*EARLIEST*) > .build(); > //topology Spout,KAFKA_READER > builder.setSpout(*BOLT_ID_KAFKA_READER*, new KafkaSpout<>(config), 1); > > -------------------------------------------------------------- > > Log: > > 518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata > version 6 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [ > 170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null), > 170.0.0.38:9092 (id: 1 rack: null)], partitions = [Partition(topic > = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], > offlineReplicas = [])]) > > 518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator > response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1, > disconnected=false, requ > estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=consumer-1, correlationId=377894), > responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, > errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null))) > > 518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) > > 518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) dead > > 518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator > request to broker 170.0.0.46:9092 (id: 3 rack: null) > > 518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator > response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0, > disconnected=false, requ > estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=consumer-1, correlationId=377896), > responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, > errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null))) > > 518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) > > 518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) dead > > 518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator > request to broker 170.0.0.38:9092 (id: 1 rack: null) > > 518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator > response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0, > disconnected=false, requ > estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=consumer-1, correlationId=377897), > responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, > errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null))) > > 518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) > > 518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=mat chgrp002] Initiating connection to > node 170.0.0.39:9092 (id: 2147483645 rack: null) > > 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Fetching committed offsets > for partitions: [tradeMatch-0] > > 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=matchg rp002] Connection with / > 170.0.0.39 disconnected > > 518742 java.net.ConnectException: Connection refused > > 518743 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_101] > > 518744 at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_101] > > 518745 at > org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) > ~[kafka-clients-1.0.0.j ar:?] > > 518746 at > org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) > ~[kafka-clients-1.0.0.jar:?] > > 518747 at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) > [kafka-clients-1.0.0.jar:?] > > 518748 at > org.apache.kafka.common.network.Selector.poll(Selector.java:398) > [kafka-clients-1.0.0.jar:?] > > 518749 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > [kafka-clients-1.0.0.jar:?] > > 518750 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) > [kafka-clients-1.0.0.ja r:?] > > 518751 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > [kafka-clients-1.0.0.ja r:?] > > 518752 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) > [kafka-clients-1.0.0.ja r:?] > > 518753 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472) > [kafka-cli ents-1.0.0.jar:?] > > 518754 at > org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441) > [kafka-clients-1.0.0.jar:?] > > 518755 at > org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464) > [storm-kafka-client-1.2.1.jar:1.2.1] > > 518756 at > org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) > [storm-kafka-client-1.2.1.jar:1.2.1] > > 518757 at > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) > [storm-kafka-client-1.2.1.jar:1.2.1] > > 518758 at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) > [storm-core-1.2.1.jar:1.2.1] > > 518759 at > org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > > 518760 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > 518761 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] > > 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=mat chgrp002] Node 2147483645 > disconnected. > > 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer > clientId=consumer-1, groupId=matc hgrp002] Connection to node > 2147483645 could not be established. Broker may not be available. > > 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=matchgrp002] Cancelled OFFSET_FETCH > request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, > clientId=consumer-1, correlationId= 377898) with correlation id > 377898 due to node 2147483645 being disconnected > > 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) dead > > 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=mat chgrp002] Sending metadata request > (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3 > rack: null) > > 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator > request to broker 170.0.0.38:9092 (id: 1 rack: null) > > 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata > version 7 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [ > 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null), > 170.0.0.46:9092 (id: 3 rack: null)], partitions = [Partition(topic > = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], > offlineReplicas = [])]) > > 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Fetching committed offsets > for partitions: [tradeMatch-0] > > 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=matchg rp002] Connection with / > 170.0.0.39 disconnected > > 518742 java.net.ConnectException: Connection refused > > 518743 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_101] > > 518744 at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_101] > > 518745 at > org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) > ~[kafka-clients-1.0.0.j ar:?] > > 518746 at > org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) > ~[kafka-clients-1.0.0.jar:?] > > 518747 at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) > [kafka-clients-1.0.0.jar:?] > > 518748 at > org.apache.kafka.common.network.Selector.poll(Selector.java:398) > [kafka-clients-1.0.0.jar:?] > > 518749 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > [kafka-clients-1.0.0.jar:?] > > 518750 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) > [kafka-clients-1.0.0.ja r:?] > > 518751 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > [kafka-clients-1.0.0.ja r:?] > > 518752 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) > [kafka-clients-1.0.0.ja r:?] > > 518753 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472) > [kafka-cli ents-1.0.0.jar:?] > > 518754 at > org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441) > [kafka-clients-1.0.0.jar:?] > > 518755 at > org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464) > [storm-kafka-client-1.2.1.jar:1.2.1] > > 518756 at > org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) > [storm-kafka-client-1.2.1.jar:1.2.1] > > 518757 at > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) > [storm-kafka-client-1.2.1.jar:1.2.1] > > 518758 at > org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) > [storm-core-1.2.1.jar:1.2.1] > > 518759 at > org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > > 518760 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > 518761 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101] > > 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=mat chgrp002] Node 2147483645 > disconnected. > > 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer > clientId=consumer-1, groupId=matc hgrp002] Connection to node > 2147483645 could not be established. Broker may not be available. > > 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=matchgrp002] Cancelled OFFSET_FETCH > request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, > clientId=consumer-1, correlationId= 377898) with correlation id > 377898 due to node 2147483645 being disconnected > > 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) dead > > 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, groupId=mat chgrp002] Sending metadata request > (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3 > rack: null) > > 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator > request to broker 170.0.0.38:9092 (id: 1 rack: null) > > 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata > version 7 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [ > 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null), > 170.0.0.46:9092 (id: 3 rack: null)], partitions = [Partition(topic > = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], > offlineReplicas = [])]) > > 518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator > response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0, > disconnected=false, requ > estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=consumer-1, correlationId=377900), > responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, > errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null))) > > 518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) > > 518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) dead > > 518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator > request to broker 170.0.0.38:9092 (id: 1 rack: null) > > 518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer > clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator > response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1, > disconnected=false, requ > estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=consumer-1, correlationId=377901), > responseBody=FindCoordinatorRe sponse(throttleTimeMs=0, > errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null))) > > 518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator > Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer > clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator > 170.0.0.39:9092 (id: 2147483645 rack: null) > > >