Sorry, the stormTopology:
> TridentTopology topology = new TridentTopology();
> BrokerHosts zk = new ZkHosts("localhost");
> TridentKafkaConfig spoutConf = new
> TridentKafkaConfig(zk, “topictest");
> spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> OpaqueTridentKafkaSpout spout = new
> OpaqueTridentKafkaSpout(spoutConf);
On Aug 5, 2014, at 9:56 AM, Sa Li <[email protected]> wrote:
> Thank you very much, Marcelo, it indeed worked, now I can run my code without
> getting error. However, another thing is keeping bother me, following is my
> code:
>
> public static class PrintStream implements Filter {
>
> @SuppressWarnings("rawtypes”)
> @Override
> public void prepare(Map conf, TridentOperationContext
> context) {
> }
> @Override
> public void cleanup() {
> }
> @Override
> public boolean isKeep(TridentTuple tuple) {
> System.out.println(tuple);
> return true;
> }
> }
> public static StormTopology buildTopology(LocalDRPC drpc) throws IOException {
>
> TridentTopology topology = new TridentTopology();
> BrokerHosts zk = new ZkHosts("localhost");
> TridentKafkaConfig spoutConf = new
> TridentKafkaConfig(zk, "ingest_test");
> spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> OpaqueTridentKafkaSpout spout = new
> OpaqueTridentKafkaSpout(spoutConf);
>
> topology.newStream("kafka", spout)
> .each(new Fields("str"),
> new PrintStream()
>
> );
>
> return topology.build();
> }
> public static void main(String[] args) throws Exception {
>
> Config conf = new Config();
> conf.setDebug(true);
> conf.setMaxSpoutPending(1);
> conf.setMaxTaskParallelism(3);
> LocalDRPC drpc = new LocalDRPC();
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("kafka", conf, buildTopology(drpc));
>
> Thread.sleep(100);
> cluster.shutdown();
> }
>
> What I expect is quite simple, print out the message I collect from a kafka
> producer playback process which is running separately. The topic is listed as:
>
> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper localhost:2181
> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 isr:
> 1,3,2
> topic: topictest partition: 1 leader: 2 replicas: 2,1,3 isr:
> 2,1,3
> topic: topictest partition: 2 leader: 3 replicas: 3,2,1 isr:
> 3,2,1
> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 isr:
> 1,2,3
> topic: topictest partition: 4 leader: 2 replicas: 2,3,1 isr:
> 2,3,1
>
> When I am running the code, this is what I saw on the screen, seems no error,
> but no message print out as well:
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1
> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= -cp
> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin
>
> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> 1113 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at
> port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
> 1216 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf
> {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
> "topology.tick.tuple.freq.secs" nil,
> "topology.builtin.metrics.bucket.size.secs" 60,
> "topology.fall.back.on.java.serialization" true,
> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
> "topology.skip.missing.kryo.registrations" true,
> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs"
> 10, "logviewer.childopts" "-Xmx128m", "java.library.path"
> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size"
> 1024, "storm.local.dir" "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9",
> "storm.messaging.netty.buffer_size" 5242880,
> "supervisor.worker.start.timeout.secs" 120,
> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128",
> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000,
> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size"
> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
> true, "storm.messaging.netty.server_worker_threads" 1,
> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root"
> "/transactional", "topology.acker.executors" nil,
> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
> "supervisor.heartbeat.frequency.secs" 5,
> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
> "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy",
> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000,
> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
> [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120,
> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
> "backtype.storm.serialization.types.ListDelegateSerializer",
> "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy",
> "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000,
> "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory",
> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1,
> "storm.zookeeper.retry.times" 5, "storm.thrift.transport"
> "backtype.storm.security.auth.SimpleTransportPlugin",
> "topology.state.synchronization.timeout.secs" 60,
> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600,
> "storm.messaging.transport" "backtype.storm.messaging.zmq",
> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
> "topology.optimize" true, "topology.max.task.parallelism" nil}
> 1219 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler
> 1237 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1303 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state
> update: :connected:none
> 1350 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1417 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1432 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state
> update: :connected:none
> 1482 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1484 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1532 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state
> update: :connected:none
> 1540 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1568 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with
> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
> "topology.tick.tuple.freq.secs" nil,
> "topology.builtin.metrics.bucket.size.secs" 60,
> "topology.fall.back.on.java.serialization" true,
> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
> "topology.skip.missing.kryo.registrations" true,
> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs"
> 10, "logviewer.childopts" "-Xmx128m", "java.library.path"
> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size"
> 1024, "storm.local.dir" "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388",
> "storm.messaging.netty.buffer_size" 5242880,
> "supervisor.worker.start.timeout.secs" 120,
> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128",
> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000,
> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size"
> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
> true, "storm.messaging.netty.server_worker_threads" 1,
> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root"
> "/transactional", "topology.acker.executors" nil,
> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
> "supervisor.heartbeat.frequency.secs" 5,
> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
> "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy",
> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000,
> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1
> 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120,
> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
> "backtype.storm.serialization.types.ListDelegateSerializer",
> "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy",
> "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000,
> "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory",
> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1,
> "storm.zookeeper.retry.times" 5, "storm.thrift.transport"
> "backtype.storm.security.auth.SimpleTransportPlugin",
> "topology.state.synchronization.timeout.secs" 60,
> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600,
> "storm.messaging.transport" "backtype.storm.messaging.zmq",
> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
> "topology.optimize" true, "topology.max.task.parallelism" nil}
> 1576 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1582 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state
> update: :connected:none
> 1590 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1632 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with
> id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev
> 1636 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with
> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
> "topology.tick.tuple.freq.secs" nil,
> "topology.builtin.metrics.bucket.size.secs" 60,
> "topology.fall.back.on.java.serialization" true,
> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
> "topology.skip.missing.kryo.registrations" true,
> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs"
> 10, "logviewer.childopts" "-Xmx128m", "java.library.path"
> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size"
> 1024, "storm.local.dir" "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912",
> "storm.messaging.netty.buffer_size" 5242880,
> "supervisor.worker.start.timeout.secs" 120,
> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128",
> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000,
> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size"
> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm",
> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable"
> true, "storm.messaging.netty.server_worker_threads" 1,
> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root"
> "/transactional", "topology.acker.executors" nil,
> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
> "supervisor.heartbeat.frequency.secs" 5,
> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
> "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy",
> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000,
> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (4
> 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120,
> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
> "backtype.storm.serialization.types.ListDelegateSerializer",
> "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy",
> "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000,
> "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory",
> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1,
> "storm.zookeeper.retry.times" 5, "storm.thrift.transport"
> "backtype.storm.security.auth.SimpleTransportPlugin",
> "topology.state.synchronization.timeout.secs" 60,
> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600,
> "storm.messaging.transport" "backtype.storm.messaging.zmq",
> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
> "topology.optimize" true, "topology.max.task.parallelism" nil}
> 1638 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1648 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state
> update: :connected:none
> 1690 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
> Starting
> 1740 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with
> id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev
> 1944 [main] INFO backtype.storm.daemon.nimbus - Received topology submission
> for kafka with conf {"topology.max.task.parallelism" nil,
> "topology.acker.executors" nil, "topology.kryo.register"
> {"storm.trident.topology.TransactionAttempt" nil}, "topology.kryo.decorators"
> (), "topology.name" "kafka", "storm.id" "kafka-1-1407257070",
> "topology.debug" true}
> 1962 [main] INFO backtype.storm.daemon.nimbus - Activating kafka:
> kafka-1-1407257070
> 2067 [main] INFO backtype.storm.scheduler.EvenScheduler - Available slots:
> (["944e6152-ca58-4d2b-8325-94ac98f43995" 1]
> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2]
> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3]
> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4]
> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5]
> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6])
> 2088 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment for
> topology id kafka-1-1407257070:
> #backtype.storm.daemon.common.Assignment{:master-code-dir
> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070",
> :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"},
> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [5 5]
> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4]
> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2]
> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1]
> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs {[1
> 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, [3 3]
> 1407257070}}
> 2215 [main] INFO backtype.storm.daemon.nimbus - Shutting down master
> 2223 [main] INFO backtype.storm.daemon.nimbus - Shut down master
> 2239 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor
> 944e6152-ca58-4d2b-8325-94ac98f43995
> 2240 [Thread-6] INFO backtype.storm.event - Event manager interrupted
> 2241 [Thread-7] INFO backtype.storm.event - Event manager interrupted
> 2248 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor
> e8303ca7-9cc4-4551-8387-7559fc3c53fc
> 2248 [Thread-9] INFO backtype.storm.event - Event manager interrupted
> 2248 [Thread-10] INFO backtype.storm.event - Event manager interrupted
> 2256 [main] INFO backtype.storm.testing - Shutting down in process zookeeper
> 2257 [main] INFO backtype.storm.testing - Done shutting down in process
> zookeeper
> 2258 [main] INFO backtype.storm.testing - Deleting temporary path
> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9
> 2259 [main] INFO backtype.storm.testing - Deleting temporary path
> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
> 2260 [main] INFO backtype.storm.testing - Deleting temporary path
> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388
> 2261 [main] INFO backtype.storm.testing - Deleting temporary path
> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912
>
> Anyone can help me locate what the problem is? I really need to walk through
> this step in order to be able to replace .each(printStream()) with other
> functions.
>
>
> Thanks
>
> Alec
>
> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <[email protected]> wrote:
>
>> hello,
>>
>> you can check your .jar application with command " jar tf " to see if class
>> kafka/api/OffsetRequest.class is part of the jar.
>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are using)
>> in storm_lib directory
>>
>> Marcelo
>>
>>
>> 2014-07-31 23:33 GMT+02:00 Sa Li <[email protected]>:
>> Hi, all
>>
>> I am running a kafka-spout code in storm-server, the pom is
>>
>> <groupId>org.apache.kafka</groupId>
>> <artifactId>kafka_2.9.2</artifactId>
>> <version>0.8.0</version>
>> <scope>provided</scope>
>>
>> <exclusions>
>> <exclusion>
>> <groupId>org.apache.zookeeper</groupId>
>> <artifactId>zookeeper</artifactId>
>> </exclusion>
>> <exclusion>
>> <groupId>log4j</groupId>
>> <artifactId>log4j</artifactId>
>> </exclusion>
>> </exclusions>
>>
>> </dependency>
>>
>> <!-- Storm-Kafka compiled -->
>>
>> <dependency>
>> <artifactId>storm-kafka</artifactId>
>> <groupId>org.apache.storm</groupId>
>> <version>0.9.2-incubating</version>
>> <scope>*compile*</scope>
>> </dependency>
>>
>> I can mvn package it, but when I run it
>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar
>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>
>>
>> I am getting such error
>>
>> 1657 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl -
>> Starting
>> 1682 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor
>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev
>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread
>> Thread[main,5,main] died
>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
>> at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26)
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13)
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115)
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> at
>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144)
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_55]
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_55]
>> at java.security.AccessController.doPrivileged(Native Method)
>> ~[na:1.7.0_55]
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> ~[na:1.7.0_55]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55]
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> ~[na:1.7.0_55]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55]
>>
>>
>>
>>
>> I try to poke around online, could not find a solution for it, any idea
>> about that?
>>
>>
>> Thanks
>>
>> Alec
>>
>>
>>
>>
>