Hi, all
This is a follow-up message related to my another thread “kafka-spout running
error” in which I described unable to run kafka consumer. Here I run jabbaugh’s
consumer (https://github.com/jabbaugh/kafka-storm-consumer) , here I set
KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092, KAFKA_TOPIC=topictest. Then I run
storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar
storm.example.trident.ExampleTopology ExampleTopology
Then I got similar error as my last post, not able to connect to zookeeper
3593 [Thread-9] INFO backtype.storm.daemon.worker - Worker
ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043 on
0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
3614 [Thread-26-$mastercoord-bg0] INFO
com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
3626 [Thread-26-$mastercoord-bg0] INFO backtype.storm.daemon.executor - Opened
spout $mastercoord-bg0:(1)
3629 [Thread-26-$mastercoord-bg0] INFO backtype.storm.daemon.executor -
Activating spout $mastercoord-bg0:(1)
3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError:
kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
at
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
~[storm-core-0.9.0.1.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError:
kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
at
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
~[storm-core-0.9.0.1.jar:na]
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0.1.jar:na]
at
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
~[storm-core-0.9.0.1.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
3719 [Thread-22-spout0] INFO backtype.storm.util - Halting process: ("Worker
died")
root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer#
Can anyone point me what the problem is? thanks
Alec