Hi, Looks like it is not able to find any topic name *test-topic*.Can you check and confirm if a topic named *test-topic* actually exist on Kafka.
Thanks Bijoy On Fri, Feb 21, 2014 at 5:14 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) < [email protected]> wrote: > Hi, > > I am trying to run a simple program that reads messages from Kafka topic > using storm-kafka spout. Here is the snip of the simple program -> > > List<String> hosts = new ArrayList<String>(); > hosts.add("127.0.0.1:9092");//Kafka test server running on this port > SpoutConfig kafkaConf = new > SpoutConfig(StaticHosts.fromHostString(hosts,4), KafkaSetup.TEST_TOPIC, > "/kafkastorm", "discovery"); > kafkaConf.forceStartOffsetTime(-2); > kafkaConf.zkServers = ImmutableList.of("127.0.0.1"); > kafkaConf.zkPort = 2181; > kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); > KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("spout", kafkaSpout); > builder.setBolt("printer", new PrinterBolt()) > .shuffleGrouping("spout"); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("SimpleTopology", conf, builder.createTopology()); > > When creating the Storm-Kafka spout, the program exit with the following > error -> > > > 309 [Thread-28-spout] INFO backtype.storm.daemon.executor - invoke - > Activating spout spout:(4) > 6317 [kafka-request-handler-4] WARN kafka.server.KafkaApis - warn - > [KafkaApi-0] Fetch request with correlation id 0 from client on partition > [test-topic,3] failed due to Partition [test-topic,3] doesn't exist on 0 > 6329 [Thread-28-spout] ERROR storm.kafka.KafkaUtils - fetchMessages - > Received error while fetching messages from topic: test-topicon partition: 3 > 6330 [Thread-28-spout] ERROR backtype.storm.util - invoke - Async loop > died! > java.lang.NullPointerException > at storm.kafka.PartitionManager.fill(PartitionManager.java:138) > at storm.kafka.PartitionManager.next(PartitionManager.java:108) > at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:117) > at > backtype.storm.daemon.executor$fn__1136$fn__1151$fn__1180.invoke(executor.clj:547) > at backtype.storm.util$async_loop$fn__458.invoke(util.clj:403) > at clojure.lang.AFn.run(AFn.java:24) > at java.lang.Thread.run(Thread.java:619) > > Am I doing anything wrong here? > > Thanks, > Saurabh. > > > PS: > > here are versions of different components > > storm version 0.9.0.1 > > zookeeper version 3.4.5 > > Kafka version 0.8.0-beta1 > > Kafka Spout version storm-kafka-0.9.0-wip16a ( with patch to work with > Kafka 0.8) > > > >
