You have two different versions of zookeeper on the classpath (or in your topology jar).
You need to find out where the conflicting zookeeper dependency is sneaking in and exclude it. If you are using maven 'mvn dependency:tree' and exclusions will help. -Taylor > On Aug 6, 2014, at 6:14 PM, Sa Li <[email protected]> wrote: > > Thanks, Taylor, that makes sense, I check my kafka config, the > host.name=10.100.70.128, and correspondingly change the spout config as > BrokerHosts zk = new ZkHosts("10.100.70.128"); > TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest"); > > it used to be localhost, actually localhost=10.100.70.128, so spout listen to > 10.100.70.128 and collect the topictest, but still same error: > > 3237 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.util - Async loop > died! > java.lang.NoSuchMethodError: > org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V > > thanks > > Alec > > >> On Wed, Aug 6, 2014 at 1:27 PM, P. Taylor Goetz <[email protected]> wrote: >> You are running in local mode. So storm will start an in-process zookeeper >> for it’s own use (usually on port 2000). In distributed mode, Storm will >> connect to the zookeeper quorum specified in your storm.yaml. >> >> In local mode, you would only need the external zookeeper for kafka and the >> kafka spout. When configuring the kafka spout, point it to the zookeeper >> used by kafka. >> >> - Taylor >> >> >>> On Aug 6, 2014, at 3:34 PM, Sa Li <[email protected]> wrote: >>> >>> Hi, Kushan >>> >>> You are completely right, I noticed this after you mentioned it, apparently >>> I am able to consumer the messages by kafka-console-consumer.sh which >>> listen to 2181, but storm goes to 2000 instead. >>> >>> 1319 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper >>> at port 2000 and dir /tmp/f41ad971-9f6b-433f-9dc9-9797afcc2e46 >>> 1425 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf >>> {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>> >>> I spent whole morning to walk through my configuration, this is the zoo.cfg >>> >>> # The number of milliseconds of each tick >>> tickTime=2000 >>> # The number of ticks that the initial >>> # synchronization phase can take >>> initLimit=5 >>> # The number of ticks that can pass between >>> # sending a request and getting an acknowledgement >>> syncLimit=2 >>> # the directory where the snapshot is stored. >>> dataDir=/var/lib/zookeeper >>> # Place the dataLogDir to a separate physical disc for better performance >>> # dataLogDir=/disk2/zookeeper >>> # the port at which the clients will connect >>> clientPort=2181 >>> # specify all zookeeper servers >>> # The fist port is used by followers to connect to the leader >>> # The second one is used for leader election >>> #server.1=zookeeper1:2888:3888 >>> #server.2=zookeeper2:2888:3888 >>> #server.3=zookeeper3:2888:3888 >>> >>> # To avoid seeks ZooKeeper allocates space in the transaction log file in >>> # blocks of preAllocSize kilobytes. The default block size is 64M. One >>> reason >>> # for changing the size of the blocks is to reduce the block size if >>> snapshots >>> # are taken more often. (Also, see snapCount). >>> #preAllocSize=65536 >>> # Clients can submit requests faster than ZooKeeper can process them, >>> # especially if there are a lot of clients. To prevent ZooKeeper from >>> running >>> # out of memory due to queued requests, ZooKeeper will throttle clients so >>> that >>> # there is no more than globalOutstandingLimit outstanding requests in the >>> # system. The default limit is 1,000.ZooKeeper logs transactions to a >>> # transaction log. After snapCount transactions are written to a log file a >>> # snapshot is started and a new transaction log file is started. The default >>> # snapCount is 10,000. >>> #snapCount=1000 >>> >>> # If this option is defined, requests will be will logged to a trace file >>> named >>> # traceFile.year.month.day. >>> #traceFile= >>> # Leader accepts client connections. Default value is "yes". The leader >>> machine >>> # coordinates updates. For higher update throughput at thes slight expense >>> of >>> # read throughput the leader can be configured to not accept clients and >>> focus >>> # on coordination. >>> leaderServes=yes >>> # Enable regular purging of old data and transaction logs every 24 hours >>> autopurge.purgeInterval=24 >>> autopurge.snapRetainCount=5 >>> >>> Only thing that I thought to change was to make "multi-server" setup, >>> uncomment the server.1, server.2, server.3, but didn't help. And this is >>> the storm.yaml sitting in ~/.storm >>> >>> storm.zookeeper.servers: >>> - "10.100.70.128" >>> # - "server2" >>> storm.zookeeper.port: 2181 >>> nimbus.host: "10.100.70.128" >>> nimbus.childopts: "-Xmx1024m" >>> storm.local.dir: "/app/storm" >>> java.library.path: "/usr/lib/jvm/java-7-openjdk-amd64" >>> supervisor.slots.ports: >>> - 6700 >>> - 6701 >>> - 6702 >>> - 6703 >>> # ##### These may optionally be filled in: >>> # >>> ## List of custom serializations >>> # topology.kryo.register: >>> # - org.mycompany.MyType >>> # - org.mycompany.MyType2: org.mycompany.MyType2Serializer >>> # >>> ## List of custom kryo decorators >>> # topology.kryo.decorators: >>> # - org.mycompany.MyDecorator >>> # >>> ## Locations of the drpc servers >>> drpc.servers: >>> - "10.100.70.128" >>> # - "server2" >>> drpc.port: 3772 >>> drpc.worker.threads: 64 >>> drpc.queue.size: 128 >>> drpc.invocations.port: 3773 >>> drpc.request.timeout.secs: 600 >>> drpc.childopts: "-Xmx768m" >>> ## Metrics Consumers >>> # topology.metrics.consumer.register: >>> # - class: "backtype.storm.metrics.LoggingMetricsConsumer" >>> # parallelism.hint: 1 >>> # - class: "org.mycompany.MyMetricsConsumer" >>> # parallelism.hint: 1 >>> # argument: >>> # - endpoint: "metrics-collector.mycompany.org" >>> >>> I really couldn't figure out what is trick to configure zK and storm >>> cluster, and why zookeeper listen to 2000 which is really a weird thing. >>> >>> thanks >>> >>> Alec >>> >>> >>> >>>> On Wed, Aug 6, 2014 at 6:48 AM, Kushan Maskey >>>> <[email protected]> wrote: >>>> I see that your zookeeper is listening on port 2000. Is that how you have >>>> configured the zookeeper? >>>> >>>> -- >>>> Kushan Maskey >>>> 817.403.7500 >>>> >>>> >>>>> On Tue, Aug 5, 2014 at 11:56 AM, Sa Li <[email protected]> wrote: >>>>> Thank you very much, Marcelo, it indeed worked, now I can run my code >>>>> without getting error. However, another thing is keeping bother me, >>>>> following is my code: >>>>> >>>>> public static class PrintStream implements Filter { >>>>> >>>>> @SuppressWarnings("rawtypes”) >>>>> @Override >>>>> public void prepare(Map conf, TridentOperationContext >>>>> context) { >>>>> } >>>>> @Override >>>>> public void cleanup() { >>>>> } >>>>> @Override >>>>> public boolean isKeep(TridentTuple tuple) { >>>>> System.out.println(tuple); >>>>> return true; >>>>> } >>>>> } >>>>> public static StormTopology buildTopology(LocalDRPC drpc) throws >>>>> IOException { >>>>> >>>>> TridentTopology topology = new TridentTopology(); >>>>> BrokerHosts zk = new ZkHosts("localhost"); >>>>> TridentKafkaConfig spoutConf = new >>>>> TridentKafkaConfig(zk, "ingest_test"); >>>>> spoutConf.scheme = new SchemeAsMultiScheme(new >>>>> StringScheme()); >>>>> OpaqueTridentKafkaSpout spout = new >>>>> OpaqueTridentKafkaSpout(spoutConf); >>>>> >>>>> topology.newStream("kafka", spout) >>>>> .each(new Fields("str"), >>>>> new PrintStream() >>>>> >>>>> ); >>>>> >>>>> return topology.build(); >>>>> } >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> Config conf = new Config(); >>>>> conf.setDebug(true); >>>>> conf.setMaxSpoutPending(1); >>>>> conf.setMaxTaskParallelism(3); >>>>> LocalDRPC drpc = new LocalDRPC(); >>>>> LocalCluster cluster = new LocalCluster(); >>>>> cluster.submitTopology("kafka", conf, buildTopology(drpc)); >>>>> >>>>> Thread.sleep(100); >>>>> cluster.shutdown(); >>>>> } >>>>> >>>>> What I expect is quite simple, print out the message I collect from a >>>>> kafka producer playback process which is running separately. The topic is >>>>> listed as: >>>>> >>>>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper >>>>> localhost:2181 >>>>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 >>>>> isr: 1,3,2 >>>>> topic: topictest partition: 1 leader: 2 replicas: 2,1,3 >>>>> isr: 2,1,3 >>>>> topic: topictest partition: 2 leader: 3 replicas: 3,2,1 >>>>> isr: 3,2,1 >>>>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 >>>>> isr: 1,2,3 >>>>> topic: topictest partition: 4 leader: 2 replicas: 2,3,1 >>>>> isr: 2,3,1 >>>>> >>>>> When I am running the code, this is what I saw on the screen, seems no >>>>> error, but no message print out as well: >>>>> >>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>> SLF4J: Found binding in >>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: Found binding in >>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>> explanation. >>>>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 >>>>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= >>>>> -cp >>>>> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin >>>>> >>>>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>> SLF4J: Found binding in >>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: Found binding in >>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>> explanation. >>>>> 1113 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper >>>>> at port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >>>>> 1216 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with >>>>> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>>>> "topology.tick.tuple.freq.secs" nil, >>>>> "topology.builtin.metrics.bucket.size.secs" 60, >>>>> "topology.fall.back.on.java.serialization" true, >>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>>>> "topology.skip.missing.kryo.registrations" true, >>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" >>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" >>>>> true, "topology.trident.batch.emit.interval.millis" 50, >>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", >>>>> "storm.messaging.netty.buffer_size" 5242880, >>>>> "supervisor.worker.start.timeout.secs" 120, >>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" >>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, >>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, >>>>> "topology.executor.receive.buffer.size" 1024, >>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", >>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" >>>>> true, "storm.messaging.netty.server_worker_threads" 1, >>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>>>> "/transactional", "topology.acker.executors" nil, >>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>>>> "supervisor.heartbeat.frequency.secs" 5, >>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" >>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>>>> "topology.spout.wait.strategy" >>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", >>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, >>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, >>>>> "nimbus.topology.validator" >>>>> "backtype.storm.nimbus.DefaultTopologyValidator", >>>>> "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, >>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, >>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>>>> "backtype.storm.serialization.types.ListDelegateSerializer", >>>>> "topology.disruptor.wait.strategy" >>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>>>> "backtype.storm.serialization.DefaultKryoFactory", >>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, >>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" >>>>> "backtype.storm.security.auth.SimpleTransportPlugin", >>>>> "topology.state.synchronization.timeout.secs" 60, >>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", >>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >>>>> "topology.optimize" true, "topology.max.task.parallelism" nil} >>>>> 1219 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler >>>>> 1237 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1303 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1350 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1417 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1432 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1482 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1484 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1532 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1540 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1568 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>>>> "topology.tick.tuple.freq.secs" nil, >>>>> "topology.builtin.metrics.bucket.size.secs" 60, >>>>> "topology.fall.back.on.java.serialization" true, >>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>>>> "topology.skip.missing.kryo.registrations" true, >>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" >>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" >>>>> true, "topology.trident.batch.emit.interval.millis" 50, >>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>>>> "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", >>>>> "storm.messaging.netty.buffer_size" 5242880, >>>>> "supervisor.worker.start.timeout.secs" 120, >>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" >>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, >>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, >>>>> "topology.executor.receive.buffer.size" 1024, >>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", >>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" >>>>> true, "storm.messaging.netty.server_worker_threads" 1, >>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>>>> "/transactional", "topology.acker.executors" nil, >>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>>>> "supervisor.heartbeat.frequency.secs" 5, >>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" >>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>>>> "topology.spout.wait.strategy" >>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", >>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, >>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, >>>>> "nimbus.topology.validator" >>>>> "backtype.storm.nimbus.DefaultTopologyValidator", >>>>> "supervisor.slots.ports" (1 2 3), "topology.debug" false, >>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, >>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>>>> "backtype.storm.serialization.types.ListDelegateSerializer", >>>>> "topology.disruptor.wait.strategy" >>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>>>> "backtype.storm.serialization.DefaultKryoFactory", >>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, >>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" >>>>> "backtype.storm.security.auth.SimpleTransportPlugin", >>>>> "topology.state.synchronization.timeout.secs" 60, >>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", >>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >>>>> "topology.optimize" true, "topology.max.task.parallelism" nil} >>>>> 1576 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1582 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1590 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1632 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>>>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev >>>>> 1636 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>>>> "topology.tick.tuple.freq.secs" nil, >>>>> "topology.builtin.metrics.bucket.size.secs" 60, >>>>> "topology.fall.back.on.java.serialization" true, >>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>>>> "topology.skip.missing.kryo.registrations" true, >>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" >>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" >>>>> true, "topology.trident.batch.emit.interval.millis" 50, >>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>>>> "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", >>>>> "storm.messaging.netty.buffer_size" 5242880, >>>>> "supervisor.worker.start.timeout.secs" 120, >>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" >>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, >>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, >>>>> "topology.executor.receive.buffer.size" 1024, >>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", >>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" >>>>> true, "storm.messaging.netty.server_worker_threads" 1, >>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>>>> "/transactional", "topology.acker.executors" nil, >>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>>>> "supervisor.heartbeat.frequency.secs" 5, >>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" >>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>>>> "topology.spout.wait.strategy" >>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", >>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, >>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, >>>>> "nimbus.topology.validator" >>>>> "backtype.storm.nimbus.DefaultTopologyValidator", >>>>> "supervisor.slots.ports" (4 5 6), "topology.debug" false, >>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, >>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>>>> "backtype.storm.serialization.types.ListDelegateSerializer", >>>>> "topology.disruptor.wait.strategy" >>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>>>> "backtype.storm.serialization.DefaultKryoFactory", >>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, >>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" >>>>> "backtype.storm.security.auth.SimpleTransportPlugin", >>>>> "topology.state.synchronization.timeout.secs" 60, >>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", >>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >>>>> "topology.optimize" true, "topology.max.task.parallelism" nil} >>>>> 1638 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1648 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1690 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1740 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>>>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev >>>>> 1944 [main] INFO backtype.storm.daemon.nimbus - Received topology >>>>> submission for kafka with conf {"topology.max.task.parallelism" nil, >>>>> "topology.acker.executors" nil, "topology.kryo.register" >>>>> {"storm.trident.topology.TransactionAttempt" nil}, >>>>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" >>>>> "kafka-1-1407257070", "topology.debug" true} >>>>> 1962 [main] INFO backtype.storm.daemon.nimbus - Activating kafka: >>>>> kafka-1-1407257070 >>>>> 2067 [main] INFO backtype.storm.scheduler.EvenScheduler - Available >>>>> slots: (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] >>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] >>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] >>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6]) >>>>> 2088 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment >>>>> for topology id kafka-1-1407257070: >>>>> #backtype.storm.daemon.common.Assignment{:master-code-dir >>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070", >>>>> :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, >>>>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], >>>>> [5 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs >>>>> {[1 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, >>>>> [3 3] 1407257070}} >>>>> 2215 [main] INFO backtype.storm.daemon.nimbus - Shutting down master >>>>> 2223 [main] INFO backtype.storm.daemon.nimbus - Shut down master >>>>> 2239 [main] INFO backtype.storm.daemon.supervisor - Shutting down >>>>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995 >>>>> 2240 [Thread-6] INFO backtype.storm.event - Event manager interrupted >>>>> 2241 [Thread-7] INFO backtype.storm.event - Event manager interrupted >>>>> 2248 [main] INFO backtype.storm.daemon.supervisor - Shutting down >>>>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc >>>>> 2248 [Thread-9] INFO backtype.storm.event - Event manager interrupted >>>>> 2248 [Thread-10] INFO backtype.storm.event - Event manager interrupted >>>>> 2256 [main] INFO backtype.storm.testing - Shutting down in process >>>>> zookeeper >>>>> 2257 [main] INFO backtype.storm.testing - Done shutting down in process >>>>> zookeeper >>>>> 2258 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9 >>>>> 2259 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >>>>> 2260 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388 >>>>> 2261 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912 >>>>> >>>>> Anyone can help me locate what the problem is? I really need to walk >>>>> through this step in order to be able to replace .each(printStream()) >>>>> with other functions. >>>>> >>>>> >>>>> Thanks >>>>> >>>>> Alec >>>>> >>>>>> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <[email protected]> wrote: >>>>>> >>>>>> hello, >>>>>> >>>>>> you can check your .jar application with command " jar tf " to see if >>>>>> class kafka/api/OffsetRequest.class is part of the jar. >>>>>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are >>>>>> using) in storm_lib directory >>>>>> >>>>>> Marcelo >>>>>> >>>>>> >>>>>> 2014-07-31 23:33 GMT+02:00 Sa Li <[email protected]>: >>>>>>> Hi, all >>>>>>> >>>>>>> I am running a kafka-spout code in storm-server, the pom is >>>>>>> >>>>>>> <groupId>org.apache.kafka</groupId> >>>>>>> <artifactId>kafka_2.9.2</artifactId> >>>>>>> <version>0.8.0</version> >>>>>>> <scope>provided</scope> >>>>>>> >>>>>>> <exclusions> >>>>>>> <exclusion> >>>>>>> <groupId>org.apache.zookeeper</groupId> >>>>>>> <artifactId>zookeeper</artifactId> >>>>>>> </exclusion> >>>>>>> <exclusion> >>>>>>> <groupId>log4j</groupId> >>>>>>> <artifactId>log4j</artifactId> >>>>>>> </exclusion> >>>>>>> </exclusions> >>>>>>> >>>>>>> </dependency> >>>>>>> >>>>>>> <!-- Storm-Kafka compiled --> >>>>>>> >>>>>>> <dependency> >>>>>>> <artifactId>storm-kafka</artifactId> >>>>>>> <groupId>org.apache.storm</groupId> >>>>>>> <version>0.9.2-incubating</version> >>>>>>> <scope>*compile*</scope> >>>>>>> </dependency> >>>>>>> >>>>>>> I can mvn package it, but when I run it >>>>>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar >>>>>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>>>>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>>>>>> >>>>>>> >>>>>>> I am getting such error >>>>>>> >>>>>>> 1657 [main] INFO >>>>>>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>>>>>> 1682 [main] INFO backtype.storm.daemon.supervisor - Starting >>>>>>> supervisor with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host >>>>>>> DO-mq-dev >>>>>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread >>>>>>> Thread[main,5,main] died >>>>>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest >>>>>>> at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) >>>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>>> at >>>>>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13) >>>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>>> at >>>>>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115) >>>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>>> at >>>>>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) >>>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>>>> ~[na:1.7.0_55] >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>>>> ~[na:1.7.0_55] >>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>> ~[na:1.7.0_55] >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>>> ~[na:1.7.0_55] >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>>>>>> ~[na:1.7.0_55] >>>>>>> at >>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>>>> ~[na:1.7.0_55] >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>>>>>> ~[na:1.7.0_55] >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> I try to poke around online, could not find a solution for it, any idea >>>>>>> about that? >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> Alec >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
