Hi,
I am trying to run a simple program that reads messages from Kafka topic using
storm-kafka spout. Here is the snip of the simple program ->
List<String> hosts = new ArrayList<String>();
hosts.add("127.0.0.1:9092");//Kafka test server running on this port
SpoutConfig kafkaConf = new
SpoutConfig(StaticHosts.fromHostString(hosts,4), KafkaSetup.TEST_TOPIC,
"/kafkastorm", "discovery");
kafkaConf.forceStartOffsetTime(-2);
kafkaConf.zkServers = ImmutableList.of("127.0.0.1");
kafkaConf.zkPort = 2181;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout);
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SimpleTopology", conf,
builder.createTopology());
When creating the Storm-Kafka spout, the program exit with the following error
->
309 [Thread-28-spout] INFO backtype.storm.daemon.executor - invoke -
Activating spout spout:(4)
6317 [kafka-request-handler-4] WARN kafka.server.KafkaApis - warn -
[KafkaApi-0] Fetch request with correlation id 0 from client on partition
[test-topic,3] failed due to Partition [test-topic,3] doesn't exist on 0
6329 [Thread-28-spout] ERROR storm.kafka.KafkaUtils - fetchMessages - Received
error while fetching messages from topic: test-topicon partition: 3
6330 [Thread-28-spout] ERROR backtype.storm.util - invoke - Async loop died!
java.lang.NullPointerException
at storm.kafka.PartitionManager.fill(PartitionManager.java:138)
at storm.kafka.PartitionManager.next(PartitionManager.java:108)
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:117)
at
backtype.storm.daemon.executor$fn__1136$fn__1151$fn__1180.invoke(executor.clj:547)
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:403)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:619)
Am I doing anything wrong here?
Thanks,
Saurabh.
PS:
here are versions of different components
storm version 0.9.0.1
zookeeper version 3.4.5
Kafka version 0.8.0-beta1
Kafka Spout version storm-kafka-0.9.0-wip16a ( with patch to work with Kafka
0.8)