Hi All,
I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0,
1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is stuck,
there is no responseon UI website ,even kakfa topic leader is alter to
another broker, It’s still stuck, until restart the kafka server progress.
Storm recovered from struk.
Is Storm-Kafka-Client 1.2.1 compatible with kafka-client 1.0.0/1.0.1?
Here’s some code and Storm log.Please help me with this issue.
Thanks a lot.
--------------------------------------------------------------
Code:
Kafka-client version is the same the kafka version(1.0.0,1.0.1).
Kafka is distribute on 3 brokers. There are 2 replicators and 1
partition for every Kafka topic.
KafkaSpout configureateion is as below. The topology read from just
one topic.
TopologyBuilder builder = new TopologyBuilder();
//kafka Servers IP
String bootstrapServers = properties.getProperty("bootstrap.servers");
//Kafka Spout consumer topic
String kafkaReaderTopic =
properties.getProperty("storm.kafka.reader.topic");
//KafkaSpout
KafkaSpoutConfig<String, String> config =
KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
.setProp(properties)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIES
T)
.build();
//topology Spout,KAFKA_READER
builder.setSpout(BOLT_ID_KAFKA_READER, new KafkaSpout<>(config), 1);
--------------------------------------------------------------
Log:
518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
version 6 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null),
170.0.0.38:9092 (id: 1 rack: null)], partitions = [Partition(topic =
tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
offlineReplicas = [])])
518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377894), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))
518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)
518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead
518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.46:9092 (id: 3 rack: null)
518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377896), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))
518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)
518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead
518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)
518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377897), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))
518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)
518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat chgrp002] Initiating connection to
node 170.0.0.39:9092 (id: 2147483645 rack: null)
518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Fetching committed offsets
for partitions: [tradeMatch-0]
518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=matchg rp002] Connection with /170.0.0.39
disconnected
518742 java.net.ConnectException: Connection refused
518743 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.8.0_101]
518744 at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[?:1.8.0_101]
518745 at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(Plaint
extTransportLayer.java:50) ~[kafka-clients-1.0.0.j ar:?]
518746 at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java
:106) ~[kafka-clients-1.0.0.jar:?]
518747 at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444
) [kafka-clients-1.0.0.jar:?]
518748 at
org.apache.kafka.common.network.Selector.poll(Selector.java:398)
[kafka-clients-1.0.0.jar:?]
518749 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
java:460) [kafka-clients-1.0.0.jar:?]
518750 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:238) [kafka-clients-1.0.0.ja r:?]
518751 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:214) [kafka-clients-1.0.0.ja r:?]
518752 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:174) [kafka-clients-1.0.0.ja r:?]
518753 at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommitt
edOffsets(ConsumerCoordinator.java:472) [kafka-cli ents-1.0.0.jar:?]
518754 at
org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java
:1441) [kafka-clients-1.0.0.jar:?]
518755 at
org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464
) [storm-kafka-client-1.2.1.jar:1.2.1]
518756 at
org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.j
ava:440) [storm-kafka-client-1.2.1.jar:1.2.1]
518757 at
org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
[storm-kafka-client-1.2.1.jar:1.2.1]
518758 at
org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.
clj:654) [storm-core-1.2.1.jar:1.2.1]
518759 at
org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.
2.1.jar:1.2.1]
518760 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
518761 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat chgrp002] Node 2147483645
disconnected.
518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
clientId=consumer-1, groupId=matc hgrp002] Connection to node
2147483645 could not be established. Broker may not be available.
518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=matchgrp002] Cancelled OFFSET_FETCH
request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
clientId=consumer-1, correlationId= 377898) with correlation id 377898
due to node 2147483645 being disconnected
518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead
518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat chgrp002] Sending metadata request
(type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
rack: null)
518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)
518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
version 7 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
170.0.0.46:9092 (id: 3 rack: null)], partitions = [Partition(topic =
tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
offlineReplicas = [])])
518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Fetching committed offsets
for partitions: [tradeMatch-0]
518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=matchg rp002] Connection with /170.0.0.39
disconnected
518742 java.net.ConnectException: Connection refused
518743 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.8.0_101]
518744 at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[?:1.8.0_101]
518745 at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(Plaint
extTransportLayer.java:50) ~[kafka-clients-1.0.0.j ar:?]
518746 at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java
:106) ~[kafka-clients-1.0.0.jar:?]
518747 at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444
) [kafka-clients-1.0.0.jar:?]
518748 at
org.apache.kafka.common.network.Selector.poll(Selector.java:398)
[kafka-clients-1.0.0.jar:?]
518749 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
java:460) [kafka-clients-1.0.0.jar:?]
518750 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:238) [kafka-clients-1.0.0.ja r:?]
518751 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:214) [kafka-clients-1.0.0.ja r:?]
518752 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:174) [kafka-clients-1.0.0.ja r:?]
518753 at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommitt
edOffsets(ConsumerCoordinator.java:472) [kafka-cli ents-1.0.0.jar:?]
518754 at
org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java
:1441) [kafka-clients-1.0.0.jar:?]
518755 at
org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464
) [storm-kafka-client-1.2.1.jar:1.2.1]
518756 at
org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.j
ava:440) [storm-kafka-client-1.2.1.jar:1.2.1]
518757 at
org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
[storm-kafka-client-1.2.1.jar:1.2.1]
518758 at
org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.
clj:654) [storm-core-1.2.1.jar:1.2.1]
518759 at
org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.
2.1.jar:1.2.1]
518760 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
518761 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat chgrp002] Node 2147483645
disconnected.
518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
clientId=consumer-1, groupId=matc hgrp002] Connection to node
2147483645 could not be established. Broker may not be available.
518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=matchgrp002] Cancelled OFFSET_FETCH
request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
clientId=consumer-1, correlationId= 377898) with correlation id 377898
due to node 2147483645 being disconnected
518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead
518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat chgrp002] Sending metadata request
(type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
rack: null)
518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)
518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
version 7 to Cluster(i d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
170.0.0.46:9092 (id: 3 rack: null)], partitions = [Partition(topic =
tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
offlineReplicas = [])])
518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377900), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))
518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)
518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead
518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)
518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377901), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))
518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)