Hi, all

I had similar issue, but I am using storm-kafka: 
https://github.com/apache/incubator-storm/tree/master/external/storm-kafka, it 
uses tridentTopology which is good to parse the message if the tuple is in the 
form of json. Here is my code: 

public static class PrintStream implements Filter {

                        @SuppressWarnings("rawtypes")
                        
                        @Override
                        public void prepare(Map conf, TridentOperationContext 
context) {
                        }
                        @Override
                        public void cleanup() {
                        }
            
                        @Override
                        public boolean isKeep(TridentTuple tuple) {
                                System.out.println(tuple);
                                return true;
                        }
}
public static StormTopology buildTopology(LocalDRPC drpc) throws IOException {
                   
                        TridentTopology topology = new TridentTopology();
                        BrokerHosts zk = new ZkHosts("localhost");
                        TridentKafkaConfig spoutConf = new 
TridentKafkaConfig(zk, "ingest_test");
                        spoutConf.scheme = new SchemeAsMultiScheme(new 
StringScheme());
                        OpaqueTridentKafkaSpout spout = new 
OpaqueTridentKafkaSpout(spoutConf);
                                  
                    topology.newStream("kafka", spout)
                                      .each(new Fields("str"),
                                         new PrintStream()                     
                           );
                       
                       return topology.build();
             
             }
  
             public static void main(String[] args) throws Exception {
                 
                    Config conf = new Config();
                    conf.setDebug(true);
                    conf.setMaxSpoutPending(1);
                    conf.setMaxTaskParallelism(3);
                    LocalDRPC drpc = new LocalDRPC();
                    LocalCluster cluster = new LocalCluster();
                    cluster.submitTopology("kafka", conf, buildTopology(drpc)); 
    
                    Thread.sleep(100);
                    cluster.shutdown();   
             } 
               
I was expecting to print the message generated by a kafka producer (I can use 
kafka-console-consumer to read), code runs but nothing display on screen:
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: 
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: 
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: 
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf 
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
"topology.tick.tuple.freq.secs" nil, 
"topology.builtin.metrics.bucket.size.secs" 60, 
"topology.fall.back.on.java.serialization" true, 
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
"topology.skip.missing.kryo.registrations" true, 
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
"topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
"/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 
1024, "storm.local.dir" 
"/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//724c7721-297a-42aa-87b1-7c3bc8a4dd24",
 "storm.messaging.netty.buffer_size" 5242880, 
"supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" 
true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 
3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, 
"nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, 
"storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
"topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" 
nil, "storm.zookeeper.root" "/storm", 
"storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, 
"storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" 
["localhost"], "transactional.zookeeper.root" "/transactional", 
"topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, 
"topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" 
"-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, 
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
"topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
"topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", 
"topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
"topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1 2 
3), "topology.debug" false, "nimbus.task.launch.secs" 120, 
"nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, 
"task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" 
"-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
"worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
"backtype.storm.serialization.types.ListDelegateSerializer", 
"topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", 
"nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, 
"topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", 
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
"storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
"backtype.storm.security.auth.SimpleTransportPlugin", 
"topology.state.synchronization.timeout.secs" 60, 
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
"storm.messaging.transport" "backtype.storm.messaging.zmq", 
"logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, 
"nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
"topology.optimize" true, "topology.max.task.parallelism" nil}
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: 
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id 
ee70aeee-1404-446b-97b1-2f432ec42a3d at host 192.168.128.10
[main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf 
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
"topology.tick.tuple.freq.secs" nil, 
"topology.builtin.metrics.bucket.size.secs" 60, 
"topology.fall.back.on.java.serialization" true, 
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
"topology.skip.missing.kryo.registrations" true, 
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
"topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
"/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 
1024, "storm.local.dir" 
"/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//44b7d975-ab03-4aed-91e6-bceb6f4d0042",
 "storm.messaging.netty.buffer_size" 5242880, 
"supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" 
true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 
3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, 
"nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, 
"storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
"topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" 
nil, "storm.zookeeper.root" "/storm", 
"storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, 
"storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" 
["localhost"], "transactional.zookeeper.root" "/transactional", 
"topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, 
"topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" 
"-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, 
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
"topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
"topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", 
"topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
"topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (4 5 
6), "topology.debug" false, "nimbus.task.launch.secs" 120, 
"nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, 
"task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" 
"-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
"worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
"backtype.storm.serialization.types.ListDelegateSerializer", 
"topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", 
"nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, 
"topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", 
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
"storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
"backtype.storm.security.auth.SimpleTransportPlugin", 
"topology.state.synchronization.timeout.secs" 60, 
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
"storm.messaging.transport" "backtype.storm.messaging.zmq", 
"logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, 
"nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
"topology.optimize" true, "topology.max.task.parallelism" nil}
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: 
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id 
16a5efae-8c0a-40d4-a4c1-c232937f4ed7 at host 192.168.128.10
[main] INFO backtype.storm.daemon.nimbus - Received topology submission for 
kafka with conf {"topology.acker.executors" nil, "topology.kryo.register" 
{"storm.trident.topology.TransactionAttempt" nil}, "topology.kryo.decorators" 
(), "topology.name" "kafka", "storm.id" "kafka-1-1406704138", 
"topology.max.task.parallelism" 3, "topology.debug" true, 
"topology.max.spout.pending" 1}
[main] INFO backtype.storm.daemon.nimbus - Activating kafka: kafka-1-1406704138
[main] INFO backtype.storm.scheduler.EvenScheduler - Available slots: 
(["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1] 
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 2] 
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 3] 
["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 4] 
["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 5] 
["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 6])
[main] INFO backtype.storm.daemon.nimbus - Setting new assignment for topology 
id kafka-1-1406704138: 
#backtype.storm.daemon.common.Assignment{:master-code-dir 
"/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//d170038a-0f91-4124-b37c-2b5008188baa/nimbus/stormdist/kafka-1-1406704138",
 :node->host {"ee70aeee-1404-446b-97b1-2f432ec42a3d" "192.168.128.10"}, 
:executor->node+port {[3 3] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [5 5] 
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [4 4] 
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [2 2] 
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [1 1] 
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1]}, :executor->start-time-secs {[1 1] 
1406704139, [2 2] 1406704139, [4 4] 1406704139, [5 5] 1406704139, [3 3] 
1406704139}}
[main] INFO backtype.storm.daemon.nimbus - Shutting down master
[main] INFO backtype.storm.daemon.nimbus - Shut down master
[main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 
ee70aeee-1404-446b-97b1-2f432ec42a3d
[Thread-8] INFO backtype.storm.event - Event manager interrupted
[Thread-9] INFO backtype.storm.event - Event manager interrupted
[main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 
16a5efae-8c0a-40d4-a4c1-c232937f4ed7
[Thread-12] INFO backtype.storm.event - Event manager interrupted
[Thread-13] INFO backtype.storm.event - Event manager interrupted
[main] INFO backtype.storm.testing - Shutting down in process zookeeper
[main] INFO backtype.storm.testing - Done shutting down in process zookeeper
[main] INFO backtype.storm.testing - Deleting temporary path 
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//d170038a-0f91-4124-b37c-2b5008188baa
[main] INFO backtype.storm.testing - Deleting temporary path 
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//4b0cc44b-8d8c-459c-b400-03769fe2a7df
[main] INFO backtype.storm.testing - Deleting temporary path 
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//724c7721-297a-42aa-87b1-7c3bc8a4dd24
[main] INFO backtype.storm.testing - Deleting temporary path 
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//44b7d975-ab03-4aed-91e6-bceb6f4d0042

 Any idea about this problem?

thanks

Alec

On Jul 29, 2014, at 11:17 PM, Palak Shah <[email protected]> wrote:

> Hi, 
> 
> I am using the Kafka spout that in integrated in 
> apache-storm-0.9.2-incubating release. I am able to submit the topology to my 
> storm cluster, but it is not receiving any tuples from the Kafka topic. I 
> know the topic ("page_visits") has data because I can read it from the 
> console. 
> 
> Here is my code for topology : 
> 
> public static void main(String[] args) throws AlreadyAliveException, 
> InvalidTopologyException {
>         BrokerHosts zkHost = new ZkHosts("localhost:2181");
>         SpoutConfig spoutConfig = new SpoutConfig(
>                   zkHost, // list of Kafka brokers
>                   "page_visits", // topic to read from
>                   "/zkroot", // the root path in Zookeeper for the spout to 
> store the consumer offsets
>                   "discovery"); // an id for this consumer for storing the 
> consumer offsets in Zookeeper
>         spoutConfig.forceFromStart = true;        
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>         
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafkaSpout", kafkaSpout);
>         builder.setBolt("kafkaBolt", new 
> PrinterBolt()).shuffleGrouping("kafkaSpout");
>         
>         Config conf = new Config();
>         conf.setNumWorkers(4);
>         conf.setDebug(true);
>         
>         StormSubmitter.submitTopology(args[0], conf, 
> builder.createTopology());
>         
>     }
>     
> }
> 
> I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this a 
> versions compatibility issue? if so, which version should I use for this to 
> work? 
> 
> Thanks in Advance,
> Palak Shah

Reply via email to