If I set the sleep time as 1000 milisec, I got such error:

3067 [main] INFO  backtype.storm.testing - Deleting temporary path 
/tmp/0f1851f1-9499-48a5-817e-41712921d054
3163 [Thread-10-EventThread] INFO  
com.netflix.curator.framework.state.ConnectionStateManager - State change: 
SUSPENDED
3163 [ConnectionStateManager-0] WARN  
com.netflix.curator.framework.state.ConnectionStateManager - There are no 
ConnectionStateListeners registered.
3164 [Thread-10-EventThread] WARN  backtype.storm.cluster - Received event 
:disconnected::none: with disconnected Zookeeper.
3636 [Thread-10-SendThread(localhost:2000)] WARN  
org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, 
unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
~[na:1.7.0_55]
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
~[na:1.7.0_55]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119) 
~[zookeeper-3.3.3.jar:3.3.3-1073969]
4877 [Thread-10-SendThread(localhost:2000)] WARN  
org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, 
unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
~[na:1.7.0_55]
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
~[na:1.7.0_55]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119) 
~[zookeeper-3.3.3.jar:3.3.3-1073969]
5566 [Thread-10-SendThread(localhost:2000)] WARN  
org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, 
unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused

seems not even connected to zookeeper, any method to confirm to connection of 
zookeeper?

Thanks a lot

Alec

On Aug 5, 2014, at 12:58 PM, Sa Li <[email protected]> wrote:

> Thank you very much for your reply, Taylor. I tried to increase the sleep 
> time as 1 sec or 10 sec, however I got such error, it seems to be Async loop 
> error. Any idea about that?
> 
> 3053 [Thread-19-$spoutcoord-spout0] INFO  
> org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 3058 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.util - Async loop 
> died!
> java.lang.NoSuchMethodError: 
> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>         at 
> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.Coordinator.<init>(Coordinator.java:16) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getCoordinator(OpaqueTridentKafkaSpout.java:29)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator.<init>(OpaquePartitionedTridentSpoutExecutor.java:27)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getCoordinator(OpaquePartitionedTridentSpoutExecutor.java:166)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:38)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:26) 
> ~[storm-core-0.9.0.1.jar:na]
>         at 
> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
> ~[storm-core-0.9.0.1.jar:na]
>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
> ~[storm-core-0.9.0.1.jar:na]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3058 [Thread-25-spout0] ERROR backtype.storm.util - Async loop died!
> java.lang.NoSuchMethodError: 
> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>         at 
> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.TridentKafkaEmitter.<init>(TridentKafkaEmitter.java:44)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getEmitter(OpaqueTridentKafkaSpout.java:24)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:69)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:171)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:20)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:43)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
> ~[storm-core-0.9.0.1.jar:na]
>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
> ~[storm-core-0.9.0.1.jar:na]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3059 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.daemon.executor -
> java.lang.NoSuchMethodError: 
> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>         at 
> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.Coordinator.<init>(Coordinator.java:16) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getCoordinator(OpaqueTridentKafkaSpout.java:29)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator.<init>(OpaquePartitionedTridentSpoutExecutor.java:27)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getCoordinator(OpaquePartitionedTridentSpoutExecutor.java:166)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:38)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:26) 
> ~[storm-core-0.9.0.1.jar:na]
>         at 
> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
> ~[storm-core-0.9.0.1.jar:na]
>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
> ~[storm-core-0.9.0.1.jar:na]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3059 [Thread-25-spout0] ERROR backtype.storm.daemon.executor -
> java.lang.NoSuchMethodError: 
> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>         at 
> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.TridentKafkaEmitter.<init>(TridentKafkaEmitter.java:44)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getEmitter(OpaqueTridentKafkaSpout.java:24)
>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:69)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:171)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:20)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:43)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214)
>  ~[storm-core-0.9.0.1.jar:na]
>         at 
> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
> ~[storm-core-0.9.0.1.jar:na]
>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
> ~[storm-core-0.9.0.1.jar:na]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3059 [Thread-7] INFO  backtype.storm.daemon.worker - Worker has topology 
> config {"storm.id" "kafka-1-1407268492", "dev.zookeeper.path" 
> "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, 
> "topology.builtin.metrics.bucket.size.secs" 60, 
> "topology.fall.back.on.java.serialization" true, 
> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
> "topology.skip.missing.kryo.registrations" true, 
> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" 
> 1024, "storm.local.dir" "/tmp/ca948198-69df-440b-8acb-6dfc4db6c288", 
> "storm.messaging.netty.buffer_size" 5242880, 
> "supervisor.worker.start.timeout.secs" 120, 
> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", 
> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, 
> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
> true, "storm.messaging.netty.server_worker_threads" 1, 
> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
> "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" 
> (), "topology.name" "kafka", "topology.transfer.buffer.size" 1024, 
> "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" 
> "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, 
> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
> "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", 
> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1 
> 2 3), "topology.debug" true, "nimbus.task.launch.secs" 120, 
> "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" 
> {"storm.trident.topology.TransactionAttempt" nil}, 
> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 
> 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, 
> "topology.tuple.serializer" 
> "backtype.storm.serialization.types.ListDelegateSerializer", 
> "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", 
> "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, 
> "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", 
> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
> "backtype.storm.security.auth.SimpleTransportPlugin", 
> "topology.state.synchronization.timeout.secs" 60, 
> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
> "storm.messaging.transport" "backtype.storm.messaging.zmq", 
> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
> "topology.optimize" true, "topology.max.task.parallelism" nil}
> 3059 [Thread-7] INFO  backtype.storm.daemon.worker - Worker 
> 64335058-7f94-447f-bc0a-5107084789a0 for storm kafka-1-1407268492 on 
> cf2964b3-7655-4a33-88a1-f6e0ceb6f9ed:1 has finished loading
> 3164 [Thread-29-$mastercoord-bg0] INFO  
> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
> 3173 [Thread-25-spout0] INFO  backtype.storm.util - Halting process: ("Worker 
> died")
> 3173 [Thread-19-$spoutcoord-spout0] INFO  backtype.storm.util - Halting 
> process: ("Worker died")
> 
> Thanks
> 
> Alec
> 
> On Aug 5, 2014, at 10:26 AM, P. Taylor Goetz <[email protected]> wrote:
> 
>> You are only sleeping for 100 milliseconds before shutting down the local 
>> cluster, which is probably not long enough for the topology to come up and 
>> start processing messages. Try increasing the sleep time to something like 
>> 10 seconds.
>> 
>> You can also reduce startup time with the following JVM flag:
>> 
>> -Djava.net.preferIPv4Stack=true
>> 
>> - Taylor
>> 
>> On Aug 5, 2014, at 1:16 PM, Sa Li <[email protected]> wrote:
>> 
>>> Sorry, the stormTopology:
>>> 
>>>>                         TridentTopology topology = new TridentTopology();
>>>>                    BrokerHosts zk = new ZkHosts("localhost");
>>>>                    TridentKafkaConfig spoutConf = new 
>>>> TridentKafkaConfig(zk, “topictest");
>>>>                    spoutConf.scheme = new SchemeAsMultiScheme(new 
>>>> StringScheme());
>>>>                    OpaqueTridentKafkaSpout spout = new 
>>>> OpaqueTridentKafkaSpout(spoutConf);
>>> 
>>> 
>>> 
>>> 
>>> On Aug 5, 2014, at 9:56 AM, Sa Li <[email protected]> wrote:
>>> 
>>>> Thank you very much, Marcelo, it indeed worked, now I can run my code 
>>>> without getting error. However, another thing is keeping bother me, 
>>>> following is my code:
>>>> 
>>>> public static class PrintStream implements Filter {
>>>> 
>>>>                    @SuppressWarnings("rawtypes”)
>>>>                    @Override
>>>>                    public void prepare(Map conf, TridentOperationContext 
>>>> context) {
>>>>                    }
>>>>                    @Override
>>>>                    public void cleanup() {
>>>>                    }
>>>>                    @Override
>>>>                    public boolean isKeep(TridentTuple tuple) {
>>>>                            System.out.println(tuple);
>>>>                            return true;
>>>>                    }
>>>> }
>>>> public static StormTopology buildTopology(LocalDRPC drpc) throws 
>>>> IOException {
>>>>               
>>>>                    TridentTopology topology = new TridentTopology();
>>>>                    BrokerHosts zk = new ZkHosts("localhost");
>>>>                    TridentKafkaConfig spoutConf = new 
>>>> TridentKafkaConfig(zk, "ingest_test");
>>>>                    spoutConf.scheme = new SchemeAsMultiScheme(new 
>>>> StringScheme());
>>>>                    OpaqueTridentKafkaSpout spout = new 
>>>> OpaqueTridentKafkaSpout(spoutConf);
>>>>                              
>>>>                    topology.newStream("kafka", spout)  
>>>>                                          .each(new Fields("str"),
>>>>                                           new PrintStream()                
>>>>      
>>>>                    );
>>>>                   
>>>>                   return topology.build();
>>>> }
>>>> public static void main(String[] args) throws Exception {
>>>>             
>>>>                Config conf = new Config();
>>>>                conf.setDebug(true);
>>>>                     conf.setMaxSpoutPending(1);
>>>>                conf.setMaxTaskParallelism(3);
>>>>                LocalDRPC drpc = new LocalDRPC();
>>>>                LocalCluster cluster = new LocalCluster();
>>>>                cluster.submitTopology("kafka", conf, buildTopology(drpc)); 
>>>>     
>>>>                Thread.sleep(100);
>>>>                cluster.shutdown();   
>>>> } 
>>>> 
>>>> What I expect is quite simple, print out the message I collect from a 
>>>> kafka producer playback process which is running separately. The topic is 
>>>> listed as:
>>>> 
>>>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper 
>>>> localhost:2181
>>>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2 
>>>> isr: 1,3,2
>>>> topic: topictest        partition: 1    leader: 2       replicas: 2,1,3 
>>>> isr: 2,1,3
>>>> topic: topictest        partition: 2    leader: 3       replicas: 3,2,1 
>>>> isr: 3,2,1
>>>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3 
>>>> isr: 1,2,3
>>>> topic: topictest        partition: 4    leader: 2       replicas: 2,3,1 
>>>> isr: 2,3,1
>>>> 
>>>> When I am running the code, this is what I saw on the screen, seems no 
>>>> error, but no message print out as well:
>>>> 
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in 
>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in 
>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>>>> explanation.
>>>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 
>>>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= 
>>>> -cp 
>>>> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin
>>>>  
>>>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>>>>  storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>> SLF4J: Found binding in 
>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: Found binding in 
>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>>>> explanation.
>>>> 1113 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper 
>>>> at port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>>>> 1216 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with conf 
>>>> {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>>>> "topology.tick.tuple.freq.secs" nil, 
>>>> "topology.builtin.metrics.bucket.size.secs" 60, 
>>>> "topology.fall.back.on.java.serialization" true, 
>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>>>> "topology.skip.missing.kryo.registrations" true, 
>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" 
>>>> true, "topology.trident.batch.emit.interval.millis" 50, 
>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", 
>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", 
>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" 
>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", 
>>>> "storm.messaging.netty.buffer_size" 5242880, 
>>>> "supervisor.worker.start.timeout.secs" 120, 
>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, 
>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
>>>> "topology.executor.receive.buffer.size" 1024, 
>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
>>>> true, "storm.messaging.netty.server_worker_threads" 1, 
>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>>>> "/transactional", "topology.acker.executors" nil, 
>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>>>> "supervisor.heartbeat.frequency.secs" 5, 
>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>>>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>>>> "topology.spout.wait.strategy" 
>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", 
>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, 
>>>> "nimbus.topology.validator" 
>>>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>>>> [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 
>>>> 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 
>>>> 30, "task.refresh.poll.secs" 10, "topology.workers" 1, 
>>>> "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, 
>>>> "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, 
>>>> "topology.tuple.serializer" 
>>>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>>>> "topology.disruptor.wait.strategy" 
>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>>>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>>>> 3773, "logviewer.port" 8000, "zmq.threads" 1, 
>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
>>>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>>>> "topology.state.synchronization.timeout.secs" 60, 
>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>>>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>>>> 1219 [main] INFO  backtype.storm.daemon.nimbus - Using default scheduler
>>>> 1237 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1303 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>> update: :connected:none
>>>> 1350 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1417 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1432 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>> update: :connected:none
>>>> 1482 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1484 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1532 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>> update: :connected:none
>>>> 1540 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1568 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor 
>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>>>> "topology.tick.tuple.freq.secs" nil, 
>>>> "topology.builtin.metrics.bucket.size.secs" 60, 
>>>> "topology.fall.back.on.java.serialization" true, 
>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>>>> "topology.skip.missing.kryo.registrations" true, 
>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" 
>>>> true, "topology.trident.batch.emit.interval.millis" 50, 
>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", 
>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", 
>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" 
>>>> "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", 
>>>> "storm.messaging.netty.buffer_size" 5242880, 
>>>> "supervisor.worker.start.timeout.secs" 120, 
>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, 
>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
>>>> "topology.executor.receive.buffer.size" 1024, 
>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
>>>> true, "storm.messaging.netty.server_worker_threads" 1, 
>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>>>> "/transactional", "topology.acker.executors" nil, 
>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>>>> "supervisor.heartbeat.frequency.secs" 5, 
>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>>>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>>>> "topology.spout.wait.strategy" 
>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", 
>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, 
>>>> "nimbus.topology.validator" 
>>>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>>>> (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120, 
>>>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, 
>>>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" 
>>>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>>>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>>>> "topology.disruptor.wait.strategy" 
>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>>>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>>>> 3773, "logviewer.port" 8000, "zmq.threads" 1, 
>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
>>>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>>>> "topology.state.synchronization.timeout.secs" 60, 
>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>>>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>>>> 1576 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1582 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>> update: :connected:none
>>>> 1590 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1632 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev
>>>> 1636 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor 
>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>>>> "topology.tick.tuple.freq.secs" nil, 
>>>> "topology.builtin.metrics.bucket.size.secs" 60, 
>>>> "topology.fall.back.on.java.serialization" true, 
>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>>>> "topology.skip.missing.kryo.registrations" true, 
>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" 
>>>> true, "topology.trident.batch.emit.interval.millis" 50, 
>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", 
>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", 
>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" 
>>>> "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", 
>>>> "storm.messaging.netty.buffer_size" 5242880, 
>>>> "supervisor.worker.start.timeout.secs" 120, 
>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, 
>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
>>>> "topology.executor.receive.buffer.size" 1024, 
>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
>>>> true, "storm.messaging.netty.server_worker_threads" 1, 
>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>>>> "/transactional", "topology.acker.executors" nil, 
>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>>>> "supervisor.heartbeat.frequency.secs" 5, 
>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>>>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>>>> "topology.spout.wait.strategy" 
>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", 
>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, 
>>>> "nimbus.topology.validator" 
>>>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>>>> (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, 
>>>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, 
>>>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" 
>>>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>>>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>>>> "topology.disruptor.wait.strategy" 
>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>>>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>>>> 3773, "logviewer.port" 8000, "zmq.threads" 1, 
>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
>>>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>>>> "topology.state.synchronization.timeout.secs" 60, 
>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>>>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>>>> 1638 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1648 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>> update: :connected:none
>>>> 1690 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>> - Starting
>>>> 1740 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev
>>>> 1944 [main] INFO  backtype.storm.daemon.nimbus - Received topology 
>>>> submission for kafka with conf {"topology.max.task.parallelism" nil, 
>>>> "topology.acker.executors" nil, "topology.kryo.register" 
>>>> {"storm.trident.topology.TransactionAttempt" nil}, 
>>>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" 
>>>> "kafka-1-1407257070", "topology.debug" true}
>>>> 1962 [main] INFO  backtype.storm.daemon.nimbus - Activating kafka: 
>>>> kafka-1-1407257070
>>>> 2067 [main] INFO  backtype.storm.scheduler.EvenScheduler - Available 
>>>> slots: (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] 
>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] 
>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] 
>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] 
>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] 
>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6])
>>>> 2088 [main] INFO  backtype.storm.daemon.nimbus - Setting new assignment 
>>>> for topology id kafka-1-1407257070: 
>>>> #backtype.storm.daemon.common.Assignment{:master-code-dir 
>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070",
>>>>  :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, 
>>>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [5 
>>>> 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] 
>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] 
>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] 
>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs 
>>>> {[1 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, 
>>>> [3 3] 1407257070}}
>>>> 2215 [main] INFO  backtype.storm.daemon.nimbus - Shutting down master
>>>> 2223 [main] INFO  backtype.storm.daemon.nimbus - Shut down master
>>>> 2239 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 
>>>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995
>>>> 2240 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
>>>> 2241 [Thread-7] INFO  backtype.storm.event - Event manager interrupted
>>>> 2248 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 
>>>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc
>>>> 2248 [Thread-9] INFO  backtype.storm.event - Event manager interrupted
>>>> 2248 [Thread-10] INFO  backtype.storm.event - Event manager interrupted
>>>> 2256 [main] INFO  backtype.storm.testing - Shutting down in process 
>>>> zookeeper
>>>> 2257 [main] INFO  backtype.storm.testing - Done shutting down in process 
>>>> zookeeper
>>>> 2258 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9
>>>> 2259 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>>>> 2260 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388
>>>> 2261 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912
>>>> 
>>>> Anyone can help me locate what the problem is? I really need to walk 
>>>> through this step in order to be able to replace .each(printStream()) with 
>>>> other functions.
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> Alec
>>>>  
>>>> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <[email protected]> wrote:
>>>> 
>>>>> hello,
>>>>> 
>>>>> you can check your .jar application with command " jar tf " to see if 
>>>>> class kafka/api/OffsetRequest.class is part of the jar.
>>>>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are 
>>>>> using) in storm_lib directory
>>>>> 
>>>>> Marcelo
>>>>> 
>>>>> 
>>>>> 2014-07-31 23:33 GMT+02:00 Sa Li <[email protected]>:
>>>>> Hi, all
>>>>> 
>>>>> I am running a kafka-spout code in storm-server, the pom is 
>>>>> 
>>>>>   <groupId>org.apache.kafka</groupId>
>>>>>         <artifactId>kafka_2.9.2</artifactId>
>>>>>         <version>0.8.0</version>
>>>>>                 <scope>provided</scope>
>>>>> 
>>>>>                 <exclusions>
>>>>>                   <exclusion>
>>>>>                        <groupId>org.apache.zookeeper</groupId>
>>>>>                        <artifactId>zookeeper</artifactId>
>>>>>                   </exclusion>
>>>>>                   <exclusion>
>>>>>                        <groupId>log4j</groupId>
>>>>>                        <artifactId>log4j</artifactId>
>>>>>                   </exclusion>
>>>>>                 </exclusions>
>>>>> 
>>>>>     </dependency>
>>>>> 
>>>>>             <!-- Storm-Kafka compiled -->
>>>>> 
>>>>>         <dependency>
>>>>>             <artifactId>storm-kafka</artifactId>
>>>>>             <groupId>org.apache.storm</groupId>
>>>>>             <version>0.9.2-incubating</version>
>>>>>             <scope>*compile*</scope>
>>>>>         </dependency>
>>>>> 
>>>>> I can mvn package it, but when I run it 
>>>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar 
>>>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
>>>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>>>>  
>>>>> 
>>>>> I am getting such error
>>>>> 
>>>>> 1657 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1682 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>>>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev
>>>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread 
>>>>> Thread[main,5,main] died
>>>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
>>>>>   at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) 
>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>   at 
>>>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13)
>>>>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>   at 
>>>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115)
>>>>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>   at 
>>>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) 
>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
>>>>>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_55]
>>>>>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_55]
>>>>>   at java.security.AccessController.doPrivileged(Native Method) 
>>>>> ~[na:1.7.0_55]
>>>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
>>>>> ~[na:1.7.0_55]
>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55]
>>>>>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
>>>>> ~[na:1.7.0_55]
>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55]
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> I try to poke around online, could not find a solution for it, any idea 
>>>>> about that?
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> Alec
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to