Hi, all

This is a follow-up message related to my another thread “kafka-spout running 
error” in which I described unable to run kafka consumer. Here I run jabbaugh’s 
consumer (https://github.com/jabbaugh/kafka-storm-consumer) , here I set 
KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092, KAFKA_TOPIC=topictest. Then I run 
 storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
storm.example.trident.ExampleTopology ExampleTopology

Then I got similar error as my last post, not able to connect to zookeeper

3593 [Thread-9] INFO  backtype.storm.daemon.worker - Worker 
ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043 on 
0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
3614 [Thread-26-$mastercoord-bg0] INFO  
com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
3626 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor - Opened 
spout $mastercoord-bg0:(1)
3629 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor - 
Activating spout $mastercoord-bg0:(1)
3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: 
kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
        at 
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65) 
~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43) 
~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) 
~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
 ~[storm-core-0.9.0.1.jar:na]
        at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403) 
~[storm-core-0.9.0.1.jar:na]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor - 
java.lang.NoSuchMethodError: 
kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
        at 
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
 ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
        at 
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
 ~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65) 
~[storm-core-0.9.0.1.jar:na]
        at 
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43) 
~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
 ~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) 
~[storm-core-0.9.0.1.jar:na]
        at 
backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
 ~[storm-core-0.9.0.1.jar:na]
        at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403) 
~[storm-core-0.9.0.1.jar:na]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
3719 [Thread-22-spout0] INFO  backtype.storm.util - Halting process: ("Worker 
died")
root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer# 

Can anyone point me what the problem is? thanks

Alec

Reply via email to