Hello, Marcelo
I have been able to print out the message getting from producer, what I did
is to make all the version of various cluster consistent. First I updated
the storm to 0.9.2-incubating, by checking the jar in ./storm/lib, we know
the zookeeper version is 3.4.5. I also upgrade kafka to Kafka0.8.1.1
compiled scala 2.9.2. In pom, I exclude the zookeeper in kafka, like
following,
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<!-- exclude the zookeeper package from Kafka
-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
For storm-kafka, I use two packages:
<dependency>
<artifactId>storm-kafka</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.2-incubating</version>
<!--
<scope>*compile*</scope>
-->
<!-- exclude the zookeeper package from storm-Kafka -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.0-wip16a-scala292</version>
<!-- exclude the zookeeper package from storm-Kafka
-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
In term of my experience, I think for the starter of storm-kafka, we really
need to check the conflict of versions among different platforms.
Thanks
Alec
On Tue, Aug 12, 2014 at 2:01 AM, Marcelo Valle <[email protected]> wrote:
> Hello Alec,
>
> What version of kafka are you using? 0.7.2?
> And Storm Version ? 0.8.1?
>
> The consumer you are using is prepared for kafka 0.7.2 and Storm 0.8.1.
>
> If you are using Storm 0.9.1 (or 0.9.2) and Kafka 0.8.0 (or 0.8.1.1) you
> can try this example:
> https://github.com/mvalleavila/Storm-0.9.1-Kafka-0.8-Test
>
> I need to change some versions in pom.xml, but is working correctly with
> current versions.
>
> If you want to use storm-kafka native connector included in 0.9.2 version
> you can try this patch to avoid ClassNotFound Errors:
>
>
> https://github.com/buildoop/buildoop/blob/development/recipes/storm/storm-0.9.2_openbus-0.0.1-r1/rpm/sources/storm-kafka-dependencies.patch
>
> Regards
>
> Marcelo
>
>
>
>
> 2014-08-05 23:04 GMT+02:00 Sa Li <[email protected]>:
>
> Hi, all
>>
>> This is a follow-up message related to my another thread “kafka-spout
>> running error” in which I described unable to run kafka consumer. Here I
>> run jabbaugh’s consumer (https://github.com/jabbaugh/kafka-storm-consumer)
>> , here I set KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092,
>> KAFKA_TOPIC=topictest. Then I run
>> storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> storm.example.trident.ExampleTopology ExampleTopology
>>
>> Then I got similar error as my last post, not able to connect to zookeeper
>>
>> 3593 [Thread-9] INFO backtype.storm.daemon.worker - Worker
>> ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043
>> on 0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
>> 3614 [Thread-26-$mastercoord-bg0] INFO
>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 3626 [Thread-26-$mastercoord-bg0] INFO backtype.storm.daemon.executor -
>> Opened spout $mastercoord-bg0:(1)
>> 3629 [Thread-26-$mastercoord-bg0] INFO backtype.storm.daemon.executor -
>> Activating spout $mastercoord-bg0:(1)
>> 3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
>> java.lang.NoSuchMethodError:
>> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
>> at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
>> ~[storm-core-0.9.0.1.jar:na]
>> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
>> ~[storm-core-0.9.0.1.jar:na]
>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor -
>> java.lang.NoSuchMethodError:
>> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
>> at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
>> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>> ~[storm-core-0.9.0.1.jar:na]
>> at
>> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
>> ~[storm-core-0.9.0.1.jar:na]
>> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
>> ~[storm-core-0.9.0.1.jar:na]
>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3719 [Thread-22-spout0] INFO backtype.storm.util - Halting process:
>> ("Worker died")
>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer#
>>
>> Can anyone point me what the problem is? thanks
>>
>> Alec
>>
>
>