Hi, all
I had similar issue, but I am using storm-kafka:
https://github.com/apache/incubator-storm/tree/master/external/storm-kafka, it
uses tridentTopology which is good to parse the message if the tuple is in the
form of json. Here is my code:
public static class PrintStream implements Filter {
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext
context) {
}
@Override
public void cleanup() {
}
@Override
public boolean isKeep(TridentTuple tuple) {
System.out.println(tuple);
return true;
}
}
public static StormTopology buildTopology(LocalDRPC drpc) throws IOException {
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new
TridentKafkaConfig(zk, "ingest_test");
spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());
OpaqueTridentKafkaSpout spout = new
OpaqueTridentKafkaSpout(spoutConf);
topology.newStream("kafka", spout)
.each(new Fields("str"),
new PrintStream()
);
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setDebug(true);
conf.setMaxSpoutPending(1);
conf.setMaxTaskParallelism(3);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", conf, buildTopology(drpc));
Thread.sleep(100);
cluster.shutdown();
}
I was expecting to print the message generated by a kafka producer (I can use
kafka-console-consumer to read), code runs but nothing display on screen:
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update:
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update:
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update:
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
"topology.tick.tuple.freq.secs" nil,
"topology.builtin.metrics.bucket.size.secs" 60,
"topology.fall.back.on.java.serialization" true,
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
"topology.skip.missing.kryo.registrations" true,
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
"topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs"
10, "logviewer.childopts" "-Xmx128m", "java.library.path"
"/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size"
1024, "storm.local.dir"
"/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//724c7721-297a-42aa-87b1-7c3bc8a4dd24",
"storm.messaging.netty.buffer_size" 5242880,
"supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts"
true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs"
3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4,
"nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100,
"storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil,
"topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers"
nil, "storm.zookeeper.root" "/storm",
"storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true,
"storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers"
["localhost"], "transactional.zookeeper.root" "/transactional",
"topology.acker.executors" nil, "topology.transfer.buffer.size" 1024,
"topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts"
"-Xmx768m", "supervisor.heartbeat.frequency.secs" 5,
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
"topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
"topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy",
"topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000,
"topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1 2
3), "topology.debug" false, "nimbus.task.launch.secs" 120,
"nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
"task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
"-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
"worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
"backtype.storm.serialization.types.ListDelegateSerializer",
"topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy",
"nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000,
"topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory",
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1,
"storm.zookeeper.retry.times" 5, "storm.thrift.transport"
"backtype.storm.security.auth.SimpleTransportPlugin",
"topology.state.synchronization.timeout.secs" 60,
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600,
"storm.messaging.transport" "backtype.storm.messaging.zmq",
"logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080,
"nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
"topology.optimize" true, "topology.max.task.parallelism" nil}
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update:
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id
ee70aeee-1404-446b-97b1-2f432ec42a3d at host 192.168.128.10
[main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
"topology.tick.tuple.freq.secs" nil,
"topology.builtin.metrics.bucket.size.secs" 60,
"topology.fall.back.on.java.serialization" true,
"topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
"topology.skip.missing.kryo.registrations" true,
"storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
"storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
"topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs"
10, "logviewer.childopts" "-Xmx128m", "java.library.path"
"/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size"
1024, "storm.local.dir"
"/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//44b7d975-ab03-4aed-91e6-bceb6f4d0042",
"storm.messaging.netty.buffer_size" 5242880,
"supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts"
true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs"
3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4,
"nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100,
"storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil,
"topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers"
nil, "storm.zookeeper.root" "/storm",
"storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true,
"storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers"
["localhost"], "transactional.zookeeper.root" "/transactional",
"topology.acker.executors" nil, "topology.transfer.buffer.size" 1024,
"topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts"
"-Xmx768m", "supervisor.heartbeat.frequency.secs" 5,
"topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
"supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
"topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
"topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
"topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy",
"topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000,
"topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator"
"backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (4 5
6), "topology.debug" false, "nimbus.task.launch.secs" 120,
"nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
"task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
"-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
"worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
"backtype.storm.serialization.types.ListDelegateSerializer",
"topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy",
"nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000,
"topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory",
"drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1,
"storm.zookeeper.retry.times" 5, "storm.thrift.transport"
"backtype.storm.security.auth.SimpleTransportPlugin",
"topology.state.synchronization.timeout.secs" 60,
"supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600,
"storm.messaging.transport" "backtype.storm.messaging.zmq",
"logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
"drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080,
"nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
"topology.optimize" true, "topology.max.task.parallelism" nil}
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update:
:connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id
16a5efae-8c0a-40d4-a4c1-c232937f4ed7 at host 192.168.128.10
[main] INFO backtype.storm.daemon.nimbus - Received topology submission for
kafka with conf {"topology.acker.executors" nil, "topology.kryo.register"
{"storm.trident.topology.TransactionAttempt" nil}, "topology.kryo.decorators"
(), "topology.name" "kafka", "storm.id" "kafka-1-1406704138",
"topology.max.task.parallelism" 3, "topology.debug" true,
"topology.max.spout.pending" 1}
[main] INFO backtype.storm.daemon.nimbus - Activating kafka: kafka-1-1406704138
[main] INFO backtype.storm.scheduler.EvenScheduler - Available slots:
(["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1]
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 2]
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 3]
["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 4]
["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 5]
["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 6])
[main] INFO backtype.storm.daemon.nimbus - Setting new assignment for topology
id kafka-1-1406704138:
#backtype.storm.daemon.common.Assignment{:master-code-dir
"/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//d170038a-0f91-4124-b37c-2b5008188baa/nimbus/stormdist/kafka-1-1406704138",
:node->host {"ee70aeee-1404-446b-97b1-2f432ec42a3d" "192.168.128.10"},
:executor->node+port {[3 3] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [5 5]
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [4 4]
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [2 2]
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [1 1]
["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1]}, :executor->start-time-secs {[1 1]
1406704139, [2 2] 1406704139, [4 4] 1406704139, [5 5] 1406704139, [3 3]
1406704139}}
[main] INFO backtype.storm.daemon.nimbus - Shutting down master
[main] INFO backtype.storm.daemon.nimbus - Shut down master
[main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor
ee70aeee-1404-446b-97b1-2f432ec42a3d
[Thread-8] INFO backtype.storm.event - Event manager interrupted
[Thread-9] INFO backtype.storm.event - Event manager interrupted
[main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor
16a5efae-8c0a-40d4-a4c1-c232937f4ed7
[Thread-12] INFO backtype.storm.event - Event manager interrupted
[Thread-13] INFO backtype.storm.event - Event manager interrupted
[main] INFO backtype.storm.testing - Shutting down in process zookeeper
[main] INFO backtype.storm.testing - Done shutting down in process zookeeper
[main] INFO backtype.storm.testing - Deleting temporary path
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//d170038a-0f91-4124-b37c-2b5008188baa
[main] INFO backtype.storm.testing - Deleting temporary path
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//4b0cc44b-8d8c-459c-b400-03769fe2a7df
[main] INFO backtype.storm.testing - Deleting temporary path
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//724c7721-297a-42aa-87b1-7c3bc8a4dd24
[main] INFO backtype.storm.testing - Deleting temporary path
/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//44b7d975-ab03-4aed-91e6-bceb6f4d0042
Any idea about this problem?
thanks
Alec
On Jul 29, 2014, at 11:17 PM, Palak Shah <[email protected]> wrote:
> Hi,
>
> I am using the Kafka spout that in integrated in
> apache-storm-0.9.2-incubating release. I am able to submit the topology to my
> storm cluster, but it is not receiving any tuples from the Kafka topic. I
> know the topic ("page_visits") has data because I can read it from the
> console.
>
> Here is my code for topology :
>
> public static void main(String[] args) throws AlreadyAliveException,
> InvalidTopologyException {
> BrokerHosts zkHost = new ZkHosts("localhost:2181");
> SpoutConfig spoutConfig = new SpoutConfig(
> zkHost, // list of Kafka brokers
> "page_visits", // topic to read from
> "/zkroot", // the root path in Zookeeper for the spout to
> store the consumer offsets
> "discovery"); // an id for this consumer for storing the
> consumer offsets in Zookeeper
> spoutConfig.forceFromStart = true;
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafkaSpout", kafkaSpout);
> builder.setBolt("kafkaBolt", new
> PrinterBolt()).shuffleGrouping("kafkaSpout");
>
> Config conf = new Config();
> conf.setNumWorkers(4);
> conf.setDebug(true);
>
> StormSubmitter.submitTopology(args[0], conf,
> builder.createTopology());
>
> }
>
> }
>
> I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this a
> versions compatibility issue? if so, which version should I use for this to
> work?
>
> Thanks in Advance,
> Palak Shah