Hi,

Looks like it is not able to find any topic name *test-topic*.Can you check
and confirm if a topic named *test-topic* actually exist on Kafka.

Thanks
Bijoy


On Fri, Feb 21, 2014 at 5:14 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) <
[email protected]> wrote:

> Hi,
>
> I am trying to run a simple program that reads messages from Kafka topic
> using storm-kafka spout. Here is the snip of the simple program ->
>
> List<String> hosts = new ArrayList<String>();
> hosts.add("127.0.0.1:9092");//Kafka test server running on this port
> SpoutConfig kafkaConf = new
> SpoutConfig(StaticHosts.fromHostString(hosts,4), KafkaSetup.TEST_TOPIC,
> "/kafkastorm", "discovery");
> kafkaConf.forceStartOffsetTime(-2);
> kafkaConf.zkServers = ImmutableList.of("127.0.0.1");
> kafkaConf.zkPort = 2181;
> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
>
> TopologyBuilder builder = new TopologyBuilder();
>  builder.setSpout("spout", kafkaSpout);
> builder.setBolt("printer", new PrinterBolt())
> .shuffleGrouping("spout");
>  LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("SimpleTopology", conf, builder.createTopology());
>
> When creating the Storm-Kafka spout, the program exit with the following
> error ->
>
>
> 309 [Thread-28-spout] INFO backtype.storm.daemon.executor - invoke -
> Activating spout spout:(4)
> 6317 [kafka-request-handler-4] WARN kafka.server.KafkaApis - warn -
> [KafkaApi-0] Fetch request with correlation id 0 from client on partition
> [test-topic,3] failed due to Partition [test-topic,3] doesn't exist on 0
> 6329 [Thread-28-spout] ERROR storm.kafka.KafkaUtils - fetchMessages -
> Received error while fetching messages from topic: test-topicon partition: 3
> 6330 [Thread-28-spout] ERROR backtype.storm.util - invoke - Async loop
> died!
> java.lang.NullPointerException
> at storm.kafka.PartitionManager.fill(PartitionManager.java:138)
> at storm.kafka.PartitionManager.next(PartitionManager.java:108)
> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:117)
> at
> backtype.storm.daemon.executor$fn__1136$fn__1151$fn__1180.invoke(executor.clj:547)
> at backtype.storm.util$async_loop$fn__458.invoke(util.clj:403)
> at clojure.lang.AFn.run(AFn.java:24)
> at java.lang.Thread.run(Thread.java:619)
>
> Am I doing anything wrong here?
>
> Thanks,
> Saurabh.
>
>
> PS:
>
> here are versions of different components
>
> storm version 0.9.0.1
>
> zookeeper version 3.4.5
>
> Kafka version 0.8.0-beta1
>
> Kafka Spout version storm-kafka-0.9.0-wip16a ( with patch to work with
> Kafka 0.8)
>
>
>
>

Reply via email to