Hi, Use the storm-kafka-client version same as Kafka Server version

On Tue, Mar 27, 2018, 2:04 PM Han Jing <hanjingjing1...@gmail.com> wrote:

> Hi All,
>
>        I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0,
> 1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is
> stuck, there is no responseon UI website ,even kakfa topic leader is alter
> to another broker, It’s still stuck, until restart the kafka server
> progress. Storm recovered from struk.
>
> *       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client
> 1.0.0/1.0.1?*
>
>
>
> *       Here’s some code and Storm log.Please help me with this issue.*
>
> *Thanks a lot.*
>
>        --------------------------------------------------------------
>
> *Code:*
>
> Kafka-client version is the same the kafka version(1.0.0,1.0.1).
>
>        Kafka is distribute on 3 brokers. There are 2 replicators  and 1
> partition for every Kafka topic.
>
>        KafkaSpout configureateion is as below. The topology read from just
> one topic.
>
> TopologyBuilder builder = new TopologyBuilder();
> //kafka Servers IP
> String bootstrapServers = properties.getProperty("bootstrap.servers");
> //Kafka Spout consumer topic
> String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic");
> //KafkaSpout
> KafkaSpoutConfig<String, String> config = 
> KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
>         .setProp(properties)
>         
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.*EARLIEST*)
>         .build();
> //topology Spout,KAFKA_READER
> builder.setSpout(*BOLT_ID_KAFKA_READER*, new KafkaSpout<>(config), 1);
>
>        --------------------------------------------------------------
>
>        Log:
>
>        518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
> version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
> 170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null),
> 170.0.0.38:9092 (id: 1 rack:        null)], partitions = [Partition(topic
> = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
> offlineReplicas = [])])
>
> 518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1,
> disconnected=false, requ
> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
> clientId=consumer-1, correlationId=377894),
> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null)))
>
> 518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.46:9092 (id: 3 rack: null)
>
> 518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0,
> disconnected=false, requ
> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
> clientId=consumer-1, correlationId=377896),
> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null)))
>
> 518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0,
> disconnected=false, requ
> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
> clientId=consumer-1, correlationId=377897),
> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null)))
>
> 518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Initiating connection to
> node 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
> for partitions: [tradeMatch-0]
>
> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=matchg       rp002] Connection with /
> 170.0.0.39 disconnected
>
> 518742 java.net.ConnectException: Connection refused
>
> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[?:1.8.0_101]
>
> 518744         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_101]
>
> 518745         at
> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
> ~[kafka-clients-1.0.0.j       ar:?]
>
> 518746         at
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
> ~[kafka-clients-1.0.0.jar:?]
>
> 518747         at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
> [kafka-clients-1.0.0.jar:?]
>
> 518748         at
> org.apache.kafka.common.network.Selector.poll(Selector.java:398)
> [kafka-clients-1.0.0.jar:?]
>
> 518749         at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> [kafka-clients-1.0.0.jar:?]
>
> 518750         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518751         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518752         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518753         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
> [kafka-cli       ents-1.0.0.jar:?]
>
> 518754         at
> org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441)
> [kafka-clients-1.0.0.jar:?]
>
> 518755         at
> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518756         at
> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518757         at
> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518758         at
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
> [storm-core-1.2.1.jar:1.2.1]
>
> 518759         at
> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.2.1.jar:1.2.1]
>
> 518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>
> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
> disconnected.
>
> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
> 2147483645 could not be established. Broker may not be available.
>
> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
> clientId=consumer-1, correlationId=       377898) with correlation id
> 377898 due to node 2147483645 being disconnected
>
> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
> rack: null)
>
> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
> version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
> 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
> 170.0.0.46:9092 (id: 3 rack:        null)], partitions = [Partition(topic
> = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
> offlineReplicas = [])])
>
> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
> for partitions: [tradeMatch-0]
>
> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=matchg       rp002] Connection with /
> 170.0.0.39 disconnected
>
> 518742 java.net.ConnectException: Connection refused
>
> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[?:1.8.0_101]
>
> 518744         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_101]
>
> 518745         at
> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
> ~[kafka-clients-1.0.0.j       ar:?]
>
> 518746         at
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
> ~[kafka-clients-1.0.0.jar:?]
>
> 518747         at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
> [kafka-clients-1.0.0.jar:?]
>
> 518748         at
> org.apache.kafka.common.network.Selector.poll(Selector.java:398)
> [kafka-clients-1.0.0.jar:?]
>
> 518749         at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> [kafka-clients-1.0.0.jar:?]
>
> 518750         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518751         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518752         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518753         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
> [kafka-cli       ents-1.0.0.jar:?]
>
> 518754         at
> org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441)
> [kafka-clients-1.0.0.jar:?]
>
> 518755         at
> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518756         at
> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518757         at
> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518758         at
> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
> [storm-core-1.2.1.jar:1.2.1]
>
> 518759         at
> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.2.1.jar:1.2.1]
>
> 518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>
> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
> disconnected.
>
> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
> 2147483645 could not be established. Broker may not be available.
>
> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
> clientId=consumer-1, correlationId=       377898) with correlation id
> 377898 due to node 2147483645 being disconnected
>
> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
> rack: null)
>
> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
> version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
> 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
> 170.0.0.46:9092 (id: 3 rack:        null)], partitions = [Partition(topic
> = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
> offlineReplicas = [])])
>
> 518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0,
> disconnected=false, requ
> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
> clientId=consumer-1, correlationId=377900),
> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null)))
>
> 518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1,
> disconnected=false, requ
> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
> clientId=consumer-1, correlationId=377901),
> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack: null)))
>
> 518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
>
>

Reply via email to