The default number of kafka partions is 10. You can either change it to 10 in your code or modify your partions number in the server.properties file or use KafkaConfig.ZkHosts for dynamic detection of kafka brokers and partitions
On Fri, Feb 21, 2014 at 12:44 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) < [email protected]> wrote: > Hi, > > I am trying to run a simple program that reads messages from Kafka topic > using storm-kafka spout. Here is the snip of the simple program -> > > List<String> hosts = new ArrayList<String>(); > hosts.add("127.0.0.1:9092");//Kafka test server running on this port > SpoutConfig kafkaConf = new > SpoutConfig(StaticHosts.fromHostString(hosts,4), KafkaSetup.TEST_TOPIC, > "/kafkastorm", "discovery"); > kafkaConf.forceStartOffsetTime(-2); > kafkaConf.zkServers = ImmutableList.of("127.0.0.1"); > kafkaConf.zkPort = 2181; > kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); > KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("spout", kafkaSpout); > builder.setBolt("printer", new PrinterBolt()) > .shuffleGrouping("spout"); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("SimpleTopology", conf, builder.createTopology()); > > When creating the Storm-Kafka spout, the program exit with the following > error -> > > > 309 [Thread-28-spout] INFO backtype.storm.daemon.executor - invoke - > Activating spout spout:(4) > 6317 [kafka-request-handler-4] WARN kafka.server.KafkaApis - warn - > [KafkaApi-0] Fetch request with correlation id 0 from client on partition > [test-topic,3] failed due to Partition [test-topic,3] doesn't exist on 0 > 6329 [Thread-28-spout] ERROR storm.kafka.KafkaUtils - fetchMessages - > Received error while fetching messages from topic: test-topicon partition: 3 > 6330 [Thread-28-spout] ERROR backtype.storm.util - invoke - Async loop > died! > java.lang.NullPointerException > at storm.kafka.PartitionManager.fill(PartitionManager.java:138) > at storm.kafka.PartitionManager.next(PartitionManager.java:108) > at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:117) > at > backtype.storm.daemon.executor$fn__1136$fn__1151$fn__1180.invoke(executor.clj:547) > at backtype.storm.util$async_loop$fn__458.invoke(util.clj:403) > at clojure.lang.AFn.run(AFn.java:24) > at java.lang.Thread.run(Thread.java:619) > > Am I doing anything wrong here? > > Thanks, > Saurabh. > > > PS: > > here are versions of different components > > storm version 0.9.0.1 > > zookeeper version 3.4.5 > > Kafka version 0.8.0-beta1 > > Kafka Spout version storm-kafka-0.9.0-wip16a ( with patch to work with > Kafka 0.8) > > > > -- With Regards, Vinoth Kumar K
